# Helm

### Prerequisites <a href="#prerequisites" id="prerequisites"></a>

* Kubernetes 1.23+
* Helm 3.8.0+
* Available local Kafka Clusters

### Introduction

The K2K Helm chart deploys two modules onto your Kubernetes cluster:

**K2K Replicator** -- the core component that continuously replicates Kafka topics from a source cluster to a target cluster. It supports exactly-once delivery, schema replication via Confluent-compatible Schema Registries, flexible topic routing, and automatic topic creation on the target cluster.

**Offset Mapper** -- an optional companion module that maps consumer group offsets between the source and target clusters. This enables consumers to resume from the correct position on the target cluster after a migration or failover. The Offset Mapper shares the base `replicationConfig` from K2K and can optionally override specific values (e.g. its own consumer group ID).

Both modules are independently configurable under the `k2k` and `offsetMapper` sections in `values.yaml`. When the Offset Mapper is enabled, the chart enforces that both `offsetMapper.enabled: true` and `k2k.replicationConfig.features.offsetMapping: "enabled"` are set together -- omitting either one will cause the chart to fail with a clear error message.

### Configure K2K <a href="#configure-hq" id="configure-hq"></a>

To configure Lenses K2K properly we have to understand the parameter groups that the Chart offers. Under the **k2k** parameter there are some key parameter groups that are used to set up HQ:

1. **licence**​
   * Definition that configures EULA acceptance.
2. ​**otelConfig**
   * Defines metric, traces and log exporters
3. ​**replicationConfig**​
   * Defines core K2K configuration file which included:
     * connection to source and destination Kafka Cluster / Schema Registry
     * replication semantics, replication options and many more

Moving forward, in the same order you can start configuring your Helm chart.

{% stepper %}
{% step %}

#### Configure licence

Before using K2K as a standalone application, you must agree to the End User License Agreement (EULA) and request a free license token by contacting <k2k@lenses.io>. Ensure this section is included in the `replicationConfig` yaml values:

{% code title="values.yaml" %}

```yaml
k2k:
  replicationConfig:
    license:
      acceptEULA: true
      token: <license token>
```

{% endcode %}
{% endstep %}

{% step %}

#### Configure OTEL options

If you would like to monitor your K2K applications and by monitor we mean, export:

* logs;
* metrics;

Then you would have to configure following block:

{% code title="values.yaml" %}

```yaml
k2k:
  otelConfig:
    serviceName: "k2k"
    metricsExporter: "prometheus"
    tracesExporter: "none"
    logsExporter: "none"
    prometheusHost: "0.0.0.0"
    prometheusPort: 9090
```

{% endcode %}

> **Note:** The export functionality for warning logs and traces is currently unavailable.
> {% endstep %}

{% step %}

#### Replication Configuration

The configuration file is in YAML and has 8 basic sections:

* **source**: defines the source cluster details (required)
* **target**: defined the target cluster details (required)
* **replication**: defines the set of topics to replicate and how to replicate (required)
* **coordination**: defines the setting for the coordinator, for example, the offsets (required)
* **features**: defines the extra functionality, such as exactly once (optional)
* **errorHandling**: defines how to handle errors (optional)
* **tracing**: defines the open tracing components (optional)

{% hint style="info" %}
More about configuration blocks and descriptions read it here: [K2K Replicator](/latest/k2k/configuration.md)
{% endhint %}

Helm definition of `replicationConfig` parameter is as an object

```yaml
k2k:
  replicationConfig: {}
```

Therefore, all the yaml parameters that one can find under the configuration document above can be freely copy/pasted.

{% hint style="success" %}
Secrets can be created via **k2k.additionalEnv** property and be referenced in following way

```yaml
foo: ${env:string:MY_ENV}
bar: ${env:number:MY_ENV}
bar2: ${env:base64:MY_ENV}
bar3: ${file:MY_ENV}
```

{% endhint %}

Example of Kafka2Kafka `replicationConfig` that can be used.

{% tabs %}
{% tab title="Plaintext connection" %}
{% code title="values.yaml" %}

```yaml
k2k:
  replicationConfig:
    name: "k2k-demo-env"
    features:
      exactlyOnce: disabled
      headerReplication: disabled
      schemaMapping: disabled
      optimizeOffsetCommitPartition: enabled
      tracingHeaders: disabled
      autoCreateControlTopics: enabled
      autoCreateTopics: enabled
    coordination:
      kafka:
        assignment:
          topic: "__k2k-app-eot-assignment"
        commit:
          topic: "__k2k-app-eot-consumer-offsets"
          group: "k2k.eot"
    source:
      kafka:
        common:
          "bootstrap.servers": "source-kafka:9092"
        consumer:
          "group.id": "k2k.eot"
    target:
      kafka:
        common:
          servers: "target-kafka:9092"
        producer:
    replication:
      - source:
          name: source
          topic:
            - "topic1"
            - "topic2"
      - sink:
          name: sink-source-topic
          topic:
            prefix: "k2k.eot."
          partition: source
```

{% endcode %}
{% endtab %}

{% tab title="AWS\_MSK\_IAM" %}
{% code title="values.yaml" %}

```yaml
serviceAccount:
  create: true
  name: msk-serverless-sa
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<AccountId>:role/MSKAccessRole

k2k:
  replicationConfig:
    name: "aws-k2k"
    coordination:
      kafka:
        commit:
          group: "k2k.prod-to-dr.coordination"     #required
          topic: "__k2k_consumer-offsets"          #optional
          syncTimeout: "10 seconds"                #optional
          batchSize: 100                           #optional
          batchTimeout: "5 seconds"                #optional
        consumer:
          group.id: "demo-k2k-coordination"
          client.id: "test-coordination"
          security.protocol: "SASL_SSL"
          sasl.mechanism: "AWS_MSK_IAM"
          sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required;"
          sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        connection:
          servers: "boot.c1.kafka.eu-west-3.amazonaws.com:9098"
    source:
      kafka:
        consumer:
          client.id: "demo-k2k"
          security.protocol: "SASL_SSL"
          sasl.mechanism: "AWS_MSK_IAM"
          sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required;"
          sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        connection:
          servers: "boot.c1.kafka.eu-west-3.amazonaws.com:9098"
    target:
      kafka:
        producer:
          security.protocol: "SASL_SSL"
          sasl.mechanism: "AWS_MSK_IAM"
          sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required;"
          sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        connection:
          servers: "boot.c2.kafka.eu-west-3.amazonaws.com:9098"
    replication:
      - source:
          name: source                             #required
          topic:                                   #required
            - "mysource-topic-1"
      - sink:
          name: sink                               #required
          partition: source                        #required
          topic:
            prefix: "aws."
            suffix: ".copy"
```

{% endcode %}
{% endtab %}

{% tab title="SASL (with secrets)" %}
Prerequisites:

* Secret with ***sasl-jaas.conf*** has to be precreated.

{% code title="values.yaml" %}

```yaml
k2k:
  additionalEnv:
    - name: SASL_JAAS_CONFIG
      valueFrom:
        secretKeyRef:
          name: kafka-jaas-secret
          key: sasl-jaas.conf
  replicationConfig:
    name: "demo-k2k"
    features:
      exactlyOnce: disabled
      headerReplication: disabled
      schemaMapping: disabled
      offsetCommitOptimizePartition: enabled
      tracingHeaders: disabled
      autoCreateControlTopics: enabled
      autoCreateTopics: enabled
    coordination:
      kafka:
        commit:
          group: "k2k.prod-to-dr.coordination"     #required
          topic: "__k2k_consumer-offsets"          #optional
          syncTimeout: "10 seconds"                #optional
          batchSize: 100                           #optional
          batchTimeout: "5 seconds"                #optional
        consumer:
          group.id: "demo-k2k-coordination"
          client.id: "demo-k2k"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "SCRAM-SHA-512"
          sasl.jaas.config: ${env:string:SASL_JAAS_CONFIG}
        connection:
          servers: "kafka-us-dev-1.domain.io:9093"
    source:
      kafka:
        consumer:
          group.id: "demo-k2k-consumer"
          client.id: "demo-k2k"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "SCRAM-SHA-512"
          sasl.jaas.config: ${env:string:SASL_JAAS_CONFIG}
        connection:
          servers: "kafka-us-dev-1.domain.io:9093"
    target:
      kafka:
        producer:
          group.id: "demo-k2k-producer"
          client.id: "demo-k2k"
          security.protocol: "SASL_PLAINTEXT"
          sasl.mechanism: "SCRAM-SHA-512"
          sasl.jaas.config: ${env:string:SASL_JAAS_CONFIG}
        connection:
          servers: "kafka-us-dev-2.domain.io:9093"
    replication:
      - source:
          name: source                             #required
          topic:                                   #required
            - "airline-customers"
      - sink:
          name: sink                               #required
          partition: source                        #required
          topic:
            prefix: "demo."
            suffix: ".copy"
            
```

{% endcode %}
{% endtab %}

{% tab title="SSL" %}
Prerequisites:

* Secret with *caroot.pem* must be precreated;
* Secret *all.pem* including certificate + private key must be precreated;

{% code title="values.yaml" %}

```yaml
additionalVolumeMounts:
  - name: external-kafka-ca-cert
    mountPath: "/etc/cacert/caroot.pem"
    subPath: "caroot.pem"
  - name: external-kafka-certs-all
    mountPath: "/etc/clientcert/all.pem"
    subPath: "all.pem"

additionalVolumes:
  - name: external-kafka-ca-cert
    secret:
      secretName: external-kafka-ca-cert
  - name: external-kafka-certs-all
    secret:
      secretName: external-kafka-certs-all

k2k:
  acceptEULA: true
  otelConfig:
    serviceName: "k2k"
    metricsExporter: "prometheus"
    tracesExporter: "none"
    logsExporter: "none"
    prometheusHost: "0.0.0.0"
    prometheusPort: "9090"
  replicationConfig:
    name: "k2k-demo-env"
    features:
      exactlyOnce: disabled
      headerReplication: disabled
      schemaMapping: disabled
      offsetCommitOptimizePartition: enabled
      tracingHeaders: disabled
      autoCreateControlTopics: enabled
      autoCreateTopics: enabled
    coordination:
      kafka:
        assignment:
          topic: "__k2k-app-eot-assignment"
        commit:
          topic: "__k2k-app-eot-consumer-offsets"
          group: "k2k.eot"
    source:
      kafka:
        consumer:
          "group.id": "k2k.eot"
          "security.protocol": "SSL"
          "ssl.truststore.type": "PEM"
          "ssl.keystore.type": "PEM"
          "ssl.truststore.location": "/etc/cacert/caroot.pem"
          "ssl.keystore.location": "/etc/clientcert/all.pem"
        connection:
          servers: "kafka-us-dev-1.domain.io:9093"
    target:
      kafka:
        producer:
          "security.protocol": "SSL"
          "ssl.truststore.type": "PEM"
          "ssl.keystore.type": "PEM"
          "ssl.truststore.location": "/etc/cacert/caroot.pem"
          "ssl.keystore.location": "/etc/clientcert/all.pem"
        connection:
          servers: "kafka-us-dev-1.domain.io:9093"
    replication:
      - source:
          name: source
          topic:
            - "airline-customers"
            - "airline-customers-name"
      - sink:
          name: sink-source-topic
          topic:
            prefix: "k2k.eot."
          partition: source
```

{% endcode %}
{% endtab %}
{% endtabs %}
{% endstep %}

{% step %}

### (Optional) Configure Service Accounts

Lenses Kafka2Kafka, by default, uses the **default** Kubernetes service account but you can choose to use a specific one.

If the user defines the following:

{% code title="values.yaml" %}

```yaml
# serviceAccount is the Service account to be used by Lenses to deploy apps
k2k:
  serviceAccount:
    create: true
    annotations: {}
    name: lenses-k2k
```

{% endcode %}

The chart will create a new service account in the defined namespace for **Kafka2Kafka** to use.
{% endstep %}
{% endstepper %}

### Configure Offset Mapper

To configure Lenses K2K Offset Mapper properly we have to understand the parameter groups that the Chart offers. Under the **offsetMapper** parameter there are some key parameter groups that are used to set up HQ:

{% stepper %}
{% step %}

### Enablement

To enable K2K Offset Mapper following fields have to be set

{% code title="values.yaml" %}

```yaml
k2k:
  replicationConfig:
    features:
      offsetMapping: enabled

offsetMapper:
  enabled: true
```

{% endcode %}
{% endstep %}

{% step %}

### Core Configuration

Because the Offset Mapper is an optional companion module to the K2K it reuses the base `k2k.replicationConfig` (source/target Kafka connection details, replication settings, etc.) and optionally allows overriding specific values via `offsetMapper.overrideConfig`.&#x20;

Therefore if you followed steps to configure K2K connections for the source and target, you do not have to do it again, but maybe just reference [Configuration Reference](/latest/k2k/k2k-offset-mapping/configuration-reference.md)page for some additional overrides or additions to configuration such as overriding replication configuration in the example below. **Only the values** you **specify** in `overrideConfig` will be **overridden**. Everything else is inherited from the base K2K replication config.

{% code title="values.yaml" %}

```yaml
offsetMapper:  
  overrideConfig:
    groups:
      consumerGroups: "k2k.test"
    target:
      kafka:
        common:
          group.id: "offset-mapping-k2k-test-pipeline"
```

{% endcode %}
{% endstep %}

{% step %}

### Configure OTEL options

If you would like to monitor your Offset Mapper application, configure the following block:

{% code title="values.yaml" %}

```yaml
offsetMapper:
  otelConfig:
    serviceName: "k2k-offset-mapping"
    metricsExporter: "prometheus"
    tracesExporter: "none"
    logsExporter: "none"
    prometheusHost: "0.0.0.0"
    prometheusPort: 9091
```

{% endcode %}
{% endstep %}

{% step %}

### (Optional) Configure Service Account

The Offset Mapper, by default, uses the `default` Kubernetes service account. To use a specific one

{% code title="values.yaml" %}

```yaml
offsetMapper:
  serviceAccount:
    create: true
    annotations: {}
    name: k2k-offset-mapper
```

{% endcode %}
{% endstep %}

{% step %}

### Configure Deployment Options

Adjust the Offset Mapper deployment resources based on your workload:

{% code title="values.yaml" %}

```yaml
offsetMapper:
  deployment:
    replicas: 1
    resources:
      requests:
        memory: 512Mi
      limits:
        memory: 1Gi
```

{% endcode %}
{% endstep %}
{% endstepper %}

<details>

<summary>Example of K2K + Offset Mapper deployment via Helm Chart</summary>

{% code title="values.yaml" %}

```yaml
nameOverride: ""
fullnameOverride: ""

commonLabels:
  team: data-platform
  environment: test

k2k:
  annotations:
    description: "K2K test deployment"
  labels:
    app.kubernetes.io/component: replicator
  service:
    enabled: true
    type: ClusterIP
  serviceAccount:
    create: true
    name: msk-sa
  deployment:
    replicas: 1
    resources:
      requests:
        cpu: 500m
        memory: 1Gi
      limits:
        cpu: 1
        memory: 2Gi
  image:
    repository: lensesio/k2k
    tag: "2.0"
    pullPolicy: IfNotPresent
  otelConfig:
    serviceName: "k2k"
    metricsExporter: "prometheus"
    tracesExporter: "none"
    logsExporter: "none"
    prometheusHost: "0.0.0.0"
    prometheusPort: 9090
  livenessProbe:
    enabled: true
  replicationConfig:
    name: "k2k-test-pipeline"
    features:
      exactlyOnce: disabled
      headerReplication: disabled
      schemaMapping: disabled
      optimizeOffsetCommitPartition: enabled
      tracingHeaders: disabled
      autoCreateControlTopics: enabled
      autoCreateTopics: enabled
      offsetMapping: enabled
    license:
      acceptEula: true
      token: "<YOUR_LICENSE>"
    source:
      kafka:
        common:
          sasl.mechanism: "AWS_MSK_IAM"
          sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=msk-sa;"
          security.protocol: "SASL_SSL"
          sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
          bootstrap.servers: ""
        consumer:
          group.id: "k2k.test"
    target:
      kafka:
        common:
          sasl.mechanism: "AWS_MSK_IAM"
          sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=msk-sa;"
          security.protocol: "SASL_SSL"
          sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
          bootstrap.servers: ""
    replication:
      - source:
          name: source
          topic:
            - "inventory-events"
      - sink:
          name: sink
          topic:
            prefix: "k2k.test."
          partition: source

offsetMapper:
  enabled: true
  annotations:
    description: "Offset Mapper test deployment"
  labels:
    app.kubernetes.io/component: offset-mapper
  service:
    enabled: true
    type: ClusterIP
  serviceAccount:
    create: true
    name: msk-sa-om
  deployment:
    replicas: 1
    resources:
      requests:
        cpu: 250m
        memory: 512Mi
      limits:
        cpu: 500m
        memory: 1Gi

  image:
    repository: lensesio/k2k-offset-mapping
    tag: "2.0"
    pullPolicy: IfNotPresent
  otelConfig:
    serviceName: "k2k-offset-mapping"
    metricsExporter: "prometheus"
    tracesExporter: "none"
    logsExporter: "none"
    prometheusHost: "0.0.0.0"
    prometheusPort: 9091
  livenessProbe:
    enabled: false
  overrideConfig:
    groups:
      consumerGroups: "k2k.test"
    target:
      kafka:
        common:
          "group.id": "offset-mapping-k2k-test-pipeline"
```

{% endcode %}

</details>

## Add chart repository

First, add the Helm Chart repository using the Helm command line:

```bash
helm repo add lensesio https://helm.repo.lenses.io/
helm repo update
```

{% embed url="<https://github.com/lensesio/k2k-helm-charts>" %}

## Installing K2K & Offset Mapper

{% hint style="info" %}
Be aware that for the time being and for alpha purposes usage of `--version`is mandatory when deploying Helm chart through Helm repository.
{% endhint %}

{% code title="terminal" %}

```bash
helm install lenses-k2k lensesio/lenses-k2k \
   --values values.yaml \
   --create-namespace --namespace lenses-k2k \
   --version 2.0.0
```

{% endcode %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/k2k/install/helm.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
