Connections
K2K Kafka and Schema Registry connection details
K2K uses configurable Kafka Consumers, Producers, Admin Clients and Schema Registry Clients to provide seamless Kafka-to-Kafka replication. This section details the source cluster connection, the target cluster connection.
Configuration Parameters
K2K provides both the flexibility to individually configure each module (Consumers, Producers, Admin Clients and Schema Registry Clients) as well as the ability to provide common configuration only once. To achieve this, under source.kafka
and target.kafka
have a common
key which allows settings to be specified for all Consumers, Producers (if applicable) and Admin Clients for those clients.
The full list of available configuration keys is listed in the table bellow.
source.kafka.common
A set of properties that will be inherited by source.kafka.consumer
and source.kafka.admin
It's advisable to define "bootstrap.servers"
and any other common properties here to avoid repetition.
source.kafka.consumer
Allows any standard Apache Kafka Consumer property to be provided. It will inherit all properties provided in source.kafka.common
If not defined under common
, the following properties must be provided:
"group.id"
"bootstrap.servers"
source.kafka.admin
Allows any standard Apache Kafka Admin Client property to be provided. It will inherit all properties from source.kafka.common
.
If not defined in common
, the following property must be provided:
"bootstrap.servers"
source.registry.config
Specify any Confluent Schema Registry compatible property.
When Schema Mapping is enabled, the following property must be provided
"schema.registry.url"
source.registry.headers
Any HTTP headers that should be added to all requests made to the Schema Registry Server.
No
target.kafka.common
A set of properties that will be inherited by target.kafka.consumer
and taget.kafka.admin
It's advisable to define "bootstrap.servers"
and any other common properties here to avoid repetition.
target.kafka.producer
Allows any standard Apache Kafka Producer property to be provided. It will inherit all properties provided in target.kafka.common
If not defined in common
, the following property must be provided:
"bootstrap.servers"
target.kafka.consumer
Allows any standard Apache Kafka Producer property to be provided. It will inherit all properties provided in target.kafka.common
If not defined in common
, the following property must be provided:
"bootstrap.servers"
target.kafka.admin
Allows any standard Apache Kafka Admin Client property to be provided. It will inherit all properties from target.kafka.common
.
If not defined in common
, the following property must be provided:
"bootstrap.servers"
target.registry.config
Specify any standard Confluent Schema Registry property.
When Schema Mapping is enabled, the following property must be provided
"schema.registry.url"
target.registry.headers
Any HTTP headers that should be added to all requests made to the Schema Registry Server.
No
Connection Types
Below is a list of connection settings for configuring your application. The examples include both target and source settings to ensure completeness. Since different Kafka providers may supply target and source clusters, adapt the application configuration accordingly.
Confluent Cloud
target:
kafka:
common:
"bootstrap.servers": "<Confluent Cloud Kafka Bootstrap Brokers>"
"security.protocol": "SASL_SSL"
"sasl.jaas.config": "<Confluent Cloud Kafka client jaas configuration>"
"sasl.mechanism": "PLAIN"
"acks": "all"
source:
kafka:
common:
"bootstrap.servers": "<Confluent Cloud Kafka Bootstrap Brokers>"
"security.protocol": "SASL_SSL"
"sasl.jaas.config": "<Confluent Cloud Kafka client jaas configuration>"
"sasl.mechanism": "PLAIN"
"client.dns.lookup": "use_all_dns_ips"
AWS MSK
...
source:
kafka:
common:
"bootstrap.servers": "<AWS MSK serverless/standard Kafka bootstrap brokers list>"
sasl.mechanism: "AWS_MSK_IAM"
sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=<Your MSK Profile Name>;"
security.protocol: "SASL_SSL"
sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
target:
kafka:
common:
"bootstrap.servers": "<AWS MSK serverless/standard Kafka bootstrap brokers list>"
sasl.mechanism: "AWS_MSK_IAM"
sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=<Your MSK Profile Name>;"
security.protocol: "SASL_SSL"
sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Google Managed Kafka
...
target:
kafka:
common:
"bootstrap.servers": "<Google Managed Kafka bootstrap brokers list>"
"sasl.mechanism": "PLAIN"
"sasl.jaas.config": '<Google Managed Kafka jaas config>'
"security.protocol": "SASL_SSL"
source:
kafka:
common:
"bootstrap.servers": "<Google Managed Kafka bootstrap list>"
"sasl.mechanism": "PLAIN"
"sasl.jaas.config": '<Google Managed Kafka jaas config>'
"security.protocol": "SASL_SSL"
When replicating to a Google Managed Kafka with automatic topic creation enabled, ensure this setting is included. This prevents errors due to Google Managed Kafka's restriction on changing default topic settings. The Kafka Admin Client does not clearly indicate the topic setting policy when it responds to topic configuration changes, and it leads to the application failing and stopping.
topicCreation:
replication:
common:
config:
"segment.bytes": null
"local.retention.ms": null
SSL
source:
kafka:
common:
"bootstrap.servers": "<Kafka Bootstrap Brokers>"
"security.protocol": "SSL"
"ssl.truststore.location": "<Path to truststore jks i.e./pipelines/truststore.jks>"
"ssl.truststore.password": "changeit"
"ssl.keystore.location": "<Path to keystore i.e. /pipelines/keystore.jks>"
"ssl.keystore.password": "changeit"
target:
kafka:
common:
"bootstrap.servers": "<Kafka Bootstrap Brokers>"
"security.protocol": "SSL"
"ssl.truststore.location": "<Path to truststore jks i.e./pipelines/truststore.jks>"
"ssl.truststore.password": "changeit"
"ssl.keystore.location": "<Path to keystore i.e. /pipelines/keystore.jks>"
"ssl.keystore.password": "changeit"
Plain Text
source:
kafka:
common:
"bootstrap.servers": "<Kafka Bootstrap Brokers>"
"security.protocol": "PLAINTEXT"
target:
kafka:
common:
"bootstrap.servers": "<Kafka Bootstrap Brokers>"
"security.protocol": "PLAINTEXT"
Example
The following example demonstrates how these properties are structured in the configuration file. This single block shows the required settings, an optional Schema Registry connection, and custom consumer properties for performance tuning.
# ---------------------------------------------------
# K2K Source Cluster Configuration
# ---------------------------------------------------
source:
kafka:
common:
# The initial list of broker servers to connect to the source cluster.
"bootstrap.servers": "prod.kafka:9092,prod.kafka-2:9092"
consumer:
# A unique ID for the consumer group reading from the source.
"group.id": "k2k.prod-to-dr.source"
# Add any standard Kafka consumer properties here for tuning.
# These settings override the K2K defaults. For a full list of available
# properties, see the official Apache Kafka consumer configurations documentation.
"fetch.min.bytes": 1024
"session.timeout.ms": 60000
# ... and so on
# Configuration for the source cluster's Schema Registry.
registry:
config:
"schema.registry.url": "http://prod.schema-registry:8081"
target:
kafka:
common:
# The initial list of broker servers to connect to the target cluster.
"bootstrap.servers": "dr.kafka:9092,dr.kafka-2:9092"
producer:
# Any standard Kafka producer properties here for tuning.
# These settings override the K2K defaults. For a full list of available
# properties, see the official Apache Kafka producer configurations documentation.
"acks": "all"
"batch.size": 32768
"linger.ms": 20
"compression.type": "zstd"
# Configuration for the target cluster's Schema Registry.
registry:
config:
"schema.registry.url": "http://dr.schema-registry:8081"
Last updated
Was this helpful?