# Connections

{% hint style="warning" %}
In order for the consumer offset mapping functionality to work, both of the following requirements must be met:

* K2K must be configured to publish offset mappings.
* The `k2k-offset-mapping` application must be configured and deployed.

Without both components active, consumer group offsets on the target cluster will not be translated.
{% endhint %}

### **Configuration Parameters**

K2K Offset Mapping provides both the flexibility to individually configure each module (Consumers, Producers, Admin Clients) as well as the ability to provide common configuration only once. To achieve this, under `source.kafka` and `target.kafka` a `common` key can be specified which allows settings to be specified for all Consumers, Producers (if applicable) and Admin Clients for those clusters.

The full list of available configuration keys is listed in the table bellow.

| `source.kafka.common`   | A set of properties that will be inherited by `source.kafka.consumer` and `source.kafka.admin`                                      | It's advisable to define `"bootstrap.servers"` and any other common properties here to avoid repetition.                                  |
| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| `source.kafka.admin`    | Allows any standard Apache Kafka Admin Client property to be provided. It will inherit all properties from `source.kafka.common` .  | <p>If not defined in <code>common</code>, the following property must be provided:</p><ul><li><code>"bootstrap.servers"</code></li></ul>  |
| `target.kafka.common`   | A set of properties that will be inherited by `target.kafka.consumer` and `taget.kafka.admin`                                       | It's advisable to define `"bootstrap.servers"` and any other common properties here to avoid repetition.                                  |
| `target.kafka.producer` | Allows any standard Apache Kafka Producer property to be provided. It will inherit all properties provided in `target.kafka.common` | <p>​If not defined in <code>common</code>, the following property must be provided:</p><ul><li><code>"bootstrap.servers"</code></li></ul> |
| `target.kafka.consumer` | Allows any standard Apache Kafka Consumer property to be provided. It will inherit all properties provided in `target.kafka.common` | <p>​If not defined in <code>common</code>, the following property must be provided:</p><ul><li><code>"bootstrap.servers"</code></li></ul> |
| `target.kafka.admin`    | Allows any standard Apache Kafka Admin Client property to be provided. It will inherit all properties from `target.kafka.common` .  | <p>​If not defined in <code>common</code>, the following property must be provided:</p><ul><li><code>"bootstrap.servers"</code></li></ul> |

### **Connection Types**

Below is a list of connection settings for configuring your application. \
In the examples, both source and target clusters are configured using the same provider (Confluent, AWS, ...) . K2K Offset Mapping however supports replication pipelines where source and target cluster are different.

#### **Confluent Cloud**

```
target:
  kafka:
    common:
      "bootstrap.servers": "<Confluent Cloud Kafka Bootstrap Brokers>"
      "security.protocol": "SASL_SSL"
      "sasl.jaas.config": "<Confluent Cloud Kafka client jaas configuration>" 
      "sasl.mechanism": "PLAIN"
      "acks": "all"
      
source:
  kafka:
    common:
      "bootstrap.servers": "<Confluent Cloud Kafka Bootstrap Brokers>"
      "security.protocol": "SASL_SSL"
      "sasl.jaas.config": "<Confluent Cloud Kafka client jaas configuration>"
      "sasl.mechanism": "PLAIN"
      "client.dns.lookup": "use_all_dns_ips"
```

#### **AWS MSK**

```yaml
...
source:
  kafka:
    common:
      "bootstrap.servers": "<AWS MSK serverless/standard Kafka bootstrap brokers list>"
      sasl.mechanism: "AWS_MSK_IAM"
      sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=<Your MSK Profile Name>;"
      security.protocol: "SASL_SSL"
      sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
target:
  kafka:
    common:
      "bootstrap.servers": "<AWS MSK serverless/standard Kafka bootstrap brokers list>"
      sasl.mechanism: "AWS_MSK_IAM"
      sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=<Your MSK Profile Name>;"
      security.protocol: "SASL_SSL"
      sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
```

#### **Google Managed Kafka**

```yaml
...
target:
  kafka:
    common:
      "bootstrap.servers": "<Google Managed Kafka bootstrap brokers list>"
      "sasl.mechanism": "PLAIN"
      "sasl.jaas.config": '<Google Managed Kafka jaas config>'
      "security.protocol": "SASL_SSL"
source: 
  kafka:
    common:
      "bootstrap.servers": "<Google Managed Kafka bootstrap list>"
      "sasl.mechanism": "PLAIN"
      "sasl.jaas.config": '<Google Managed Kafka jaas config>'
      "security.protocol": "SASL_SSL"
```

When replicating to a Google Managed Kafka with automatic topic creation enabled, ensure this setting is included. This prevents errors due to Google Managed Kafka's restriction on changing default topic settings. The Kafka Admin Client does not clearly indicate the topic setting policy when it responds to topic configuration changes, and it leads to the application failing and stopping.

#### **SSL**

```yaml
source:
  kafka:
    common:
      "bootstrap.servers": "<Kafka Bootstrap Brokers>"
      "security.protocol": "SSL"
      "ssl.truststore.location": "<Path to truststore jks i.e./pipelines/truststore.jks>"
      "ssl.truststore.password": "changeit"
      "ssl.keystore.location": "<Path to keystore i.e. /pipelines/keystore.jks>"
      "ssl.keystore.password": "changeit"
target:
  kafka:
    common:
      "bootstrap.servers": "<Kafka Bootstrap Brokers>"
      "security.protocol": "SSL"
      "ssl.truststore.location": "<Path to truststore jks i.e./pipelines/truststore.jks>"
      "ssl.truststore.password": "changeit"
      "ssl.keystore.location": "<Path to keystore i.e. /pipelines/keystore.jks>"
      "ssl.keystore.password": "changeit"
```

#### **Plain Text**

```yaml
source:
    kafka:
    common:
        "bootstrap.servers": "<Kafka Bootstrap Brokers>"
        "security.protocol": "PLAINTEXT"
target:
    kafka:
    common:
        "bootstrap.servers": "<Kafka Bootstrap Brokers>"
        "security.protocol": "PLAINTEXT"
```

### **Example**

The following example demonstrates how these properties are structured in the configuration file. This single block shows the required settings, an optional Schema Registry connection, and custom consumer properties for performance tuning.

```yaml
# ---------------------------------------------------
# K2K Source Cluster Configuration
# ---------------------------------------------------
source:
  kafka:
    common:
      # The initial list of broker servers to connect to the source cluster.
      "bootstrap.servers": "prod.kafka:9092,prod.kafka-2:9092"
    consumer:
      # A unique ID for the consumer group reading from the source.
      "group.id": "k2k.prod-to-dr.source"

      # Add any standard Kafka consumer properties here for tuning.
      # These settings override the K2K defaults. For a full list of available
      # properties, see the official Apache Kafka consumer configurations documentation.
      "fetch.min.bytes": 1024
      "session.timeout.ms": 60000
      # ... and so on

target:
  kafka:
    common:
      # The initial list of broker servers to connect to the target cluster.
      "bootstrap.servers": "dr.kafka:9092,dr.kafka-2:9092"
    producer:
      # Any standard Kafka producer properties here for tuning.
      # These settings override the K2K defaults. For a full list of available
      # properties, see the official Apache Kafka producer configurations documentation.
      "acks": "all"
      "batch.size": 32768
      "linger.ms": 20
      "compression.type": "zstd"
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/k2k/k2k-offset-mapping/connections.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
