Configuration
Learn how to configure Kafka to Kafka.
This is an alpha release. Found an issue? Feed it back to us at Github, on Slack, Ask Marios or email.
Fully control how K2K replicates topics across Kafka clusters by adjusting its configuration file.
The configuration file is in YAML and has 8 basic sections:
source: defines the source cluster details (required)
target: defined the target cluster details (required)
replication: defines the set of topics to replicate and how to replicate (required)
coordination: defines the setting for the coordinator, for example, the offsets (required)
features: defines the extra functionality, such as exactly once (optional)
errorHandling: defines how to handle errors (optional)
tracing: defines the open tracing components (optional)
source
and coordinaton
allow you to set lower-level Kafka consumer properties, for example, authentication properties, including schema registries and deserializers.
target
allows for setting the producer properties.
Here's a minimal example to replicate a single topic. It doesn't use any of the optional sections.
name: "mini-k2k"
source:
kafka:
connection:
servers: "sourceKafka:9092"
target:
kafka:
connection:
servers: "targetKafka:9092"
consumer:
"group.id": "k2k.mini-k2k.source"
replication:
- source:
name: topic-source
# choose to replicate the topic named my-topic
topic: "my-topic"
- sink:
name: topic-sink
# use the original topic name as the target topic
topic: source
# replicate each record to the same partition as the
# original topic
partition: source
coordination:
kafka:
commit:
# set the consumer group name for reading
# the K2K internal coordination topic.
group: "k2k.mini-k2k.coordination"
Name
Required
Set the name of your K2K app.
name: K2K Prod to DR
You can use any string for the name.
Source
Required
Set the connection details for the source Kafka cluster. The source cluster is where K2K will be reading the topic data to replicate.
Specification
The Kafka broker servers.
The group ID for the K2K consumer group that will be reading the source data.
Optionally, Schema registry.
source:
kafka:
connection:
servers: "prod.kafka:9092" #required
consumer:
"group.id": "k2k.prod-to-dr.source" #required
…
registry: #optional
url: "prod.schema-registry:8081"
With the source Schema Registry
Connect K2K with the source Kafka Schema Registry to:
Translate records from the source Registry to the target Registry and sync the schemas.
Filter and obfuscate records that use Registry schemas.
source:
kafka:
connection:
servers: "prod.kafka:9092"
consumer:
"group.id": "k2k.prod-to-dr.source"
registry:
url: "prod.schema-registry:8081"
Customise the source consumer
You can override the consumer configuration for K2K's source consumer. Add them under the consumer
section:
source:
kafka:
connection:
servers: "prod.kafka:9092"
consumer:
"group.id": "k2k.prod-to-dr.source"
# add any consumer config overrides here
"fetch.min.bytes": 1024
"session.timeout.ms": 60000
…
Target
Required
Set the connection details for the target Kafka cluster. The target cluster is where K2K will be writing the topic data that it replicates.
The configuration is similar to Configuration but with controls for the producer that writes the replicated data.
Specification
At minimum, you need:
The Kafka broker servers.
Optionally, producer overrides.
Optionally, a Schema registry.
target:
kafka:
connection:
servers: "dr.kafka:9092" #required
producer: #optional
…
registry: #optional
url: "dr.schema-registry:8081"
With the target Schema Registry
Connect K2K with the target Kafka Schema Registry to:
Replicate data in Registry-compatible formats (AVRO, Protobuf).
Translate records from the source Registry to the target Registry and sync the schemas.
target:
kafka:
connection:
servers: "dr.kafka:9092"
registry:
url: "dr.schema-registry:8081"
Customise the target producer
You can override the producer configuration for K2K's target producer. Add them under the optional producer
section:
target:
kafka:
connection:
servers: "dr.kafka:9092"
producer:
# add any producer config overrides here
"batch.size": 16384
"buffer.memory": 33554432
…
Replication
Required
Define the topics that will be replicated and choose how they will be routed to the target cluster topics.
Specification
replication:
- source:
name: source #required
topic: #required
…
- sink:
name: sink #required
topic: #required
…
partition: source #required
Select source topics
Choose what source topics to replicate.
Specific topics
Use this when replicating a single topic or a list of specific topics.
replication:
- source:
name: source
topic:
- "prod-topic-1"
- "prod-topic-2"
Topic patterns
Use it to capture topic name patterns such as "all topics with the prefix customer".
Express the pattern as a regex expression.
replication:
- source:
name: source
topic: "prod.*"
Select target topics
Choose the destination topics for the replication and how they relate to the source topics.
1 to 1: keep the same topic names
The simplest option. Every selected source topic will be replicated to an equivalent topic on the target Kafka cluster with the same name.
replication:
- sink:
name: sink
topic: source
partition: source
prod-topic-1
prod-topic-1
prod-topic-2
prod-topic-2
1 to 1: add prefix/suffix
Add a prefix and/or a suffix to the original topic name.
This is useful to avoid topic overlaps. It happens in setups where the target cluster might already have topics with the same name as the source.
replication:
- sink:
name: sink
topic:
prefix: "eu."
suffix: ".copy"
partition: source
prod-topic-1
eu.prod-topic-1.copy
prod-topic-2
eu.prod-topic-2.copy
All to 1: fixed target topic
All selected topics will be replicated to the same topic.
This is also useful when you're only replicating 1 topic and want to set the target topic to an arbitrary name.
replication:
- sink:
name: sink
topic:
static: "landing-topic"
partition: source
prod-topic-1
landing-topic
prod-topic-2
landing-topic
Custom mapping
The most flexible option. Use patterns to choose which source topics get mapped to what target topics.
You can map individual topics (1 to 1) or source topic patterns to individual target topics (many to 1).
replication:
- sink:
name: sink
topic:
staticMapping:
prod-topic-1: "copied-from-prod.1" #1 to 1
prod-topic-2: "prod-two-copy" #1 to 1
team-red.*: "team-red-combined-data" #many to 1
team-blue.*: "team-blue-combined-data" #many to 1
.*: "other-teams-combined-data" #many to 1
partition: source
prod-topic-1
copied-from-prod.1
prod-topic-1
prod-two-copy
team-red-topic-1
team-red-combined-data
team-red-topic-2
team-red-combined-data
team-blue-topic-1
team-blue-combined-data
team-blue-topic-2
team-blue-combined-data
another-topic-1
other-teams-combined-data
some-topic-2
other-teams-combined-data
Select a partitioning strategy
Choose how records are distributed to target topic partitions.
Same partition as the source topic
The most common approach. The record will be routed to the same partition number as it comes from the source topic. Useful to ensure that the target topic is the same as the source, partition by partition.
replication:
- sink:
name: sink
topic: source
partition: source
Producer decides
Let the K2K producer decide based on its configuration settings. Useful when the target topic has a different number of partitions than the source.
You can let the producer use the default key-based strategy, round robin, or even plug your own custom strategy.
See the partitioner.class configuration.
If you need to change the partitioner.class configuration, add it in the
target.kafka.producer
section of the config. See Configuration.
replication:
- sink:
name: sink
topic: source
partition: producer
Coordination
Required
Control how the K2K runners coordinate with each other to decide what source and target partitions to handle.
In simple mode, you set the consumer group name for the internal K2K coordination topic. In advanced mode, you can adjust how K2K commits its offsets and how it achieves exactly-once replication.
Specification
At minimum, you need:
The consumer group name for the internal K2K coordination topic. K2K uses the coordination topic to manage which source and target partitions each runner handles.
coordination:
kafka:
commit:
group: "k2k.prod-to-dr.coordination" #required
topic: "__k2k_consumer-offsets" #optional
syncTimeout: "10 seconds" #optional
batchSize: 100 #optional
batchTimeout: "5 seconds" #optional
assignment:
topic: "__k2k-assignment" #optional
graceWindow: "15 seconds" #optional
fencingMaxParallelism: 10 #optional
charset: "UTF-8" #optional
consumer: #optional
…
(Advanced) Customise the general coordination
WIP
(Advanced) Customise exactly-once coordination
WIP
(Advanced) Customise coordination consumer
You can override the consumer configuration for K2K's internal coordination topic. Add them under the consumer
section:
coordination:
kafka:
commit:
group: "k2k.prod-to-dr.coordination"
consumer:
# add any consumer config overrides here
"fetch.min.bytes": 1024
"session.timeout.ms": 60000
…
Features
(optional) Enable additional K2K capabilities:
Deliver records exactly once.
Replicate schemas between registries.
Optimise offset commits.
Add tracing headers.
Exactly once delivery
Enable exactly once semantics. This ensures that each record is replicated only once without any duplicates.
You'll need to set 2 additional properties on the source
and target
sections.
source:
kafka:
consumer:
"isolation.level": read_committed #optional
…
target:
kafka:
producer:
"enable.idempotence": true #required
…
features:
exactlyOnce: enabled
Replicate schemas
Replicate continuously the relevant schemas from the source to the target Schema Registry. The relevant schemas are the schemas that replicated records point at.
You'll need to provide the Schema Registry connection details in the source
and target
sections.
source:
kafka:
registry:
url: "prod.schema-registry:8081"
…
target:
kafka:
registry:
url: "dr.schema-registry:8081"
…
features:
schemaMapping: enabled
Optimise offset commits
Optimize offset commits by publishing all control records to the same partition.
features:
offsetCommitOptimizePartition: enabled
Tracing headers
Add tracing headers to each replicated record. Useful to add tracing metadata in your replication.
Choose to add standard metadata as headers:
partition
- Add the source partition number as a header.offset
- Add the source offset number as a header.topic
- Add the source topic name as a header.pipeline
- Add the K2K app name (pipeline) as a header.
features:
tracingHeaders:
partition: enabled #optional
offset: enabled #optional
topic: enabled #optional
pipeline: enabled #optional
If you want to customize the header names, use the tracing
section.
features:
tracingHeaders:
partition: enabled
offset: enabled
topic: enabled
pipeline: enabled
tracing:
headers:
partition: "k2k_partition" #optional
offset: "k2k_offset" #optional
topic: "k2k_topic" #optional
pipeline: "k2k_pipeline" #optional
Error handling
Optional
Choose how to deal with replication errors. You can tell K2K to either fail
or ignore
errors for different issues.
onCommitSyncTimeout
- timeout error trying to determine the latest committed offset.onControlMessageDeserializationError
- Error when deserializing control messages from internal topics (K2K coordination and assignment topics).
errorHandling:
onCommitSyncTimeout: fail|ignore #optional
onControlMessageDeserializationError: fail|ignore #optional
Last updated
Was this helpful?