Configuration

Learn how to configure Kafka to Kafka.

Fully control how K2K replicates topics across Kafka clusters by adjusting its configuration file.

The configuration file is in YAML and has 8 basic sections:

  • source: defines the source cluster details (required)

  • target: defined the target cluster details (required)

  • replication: defines the set of topics to replicate and how to replicate (required)

  • coordination: defines the setting for the coordinator, for example, the offsets (required)

  • features: defines the extra functionality, such as exactly once (optional)

  • errorHandling: defines how to handle errors (optional)

  • tracing: defines the open tracing components (optional)

Here's a minimal example to replicate a single topic. It doesn't use any of the optional sections.

name: "mini-k2k"
source:
  kafka:
    connection:
      servers: "sourceKafka:9092"
target:
  kafka:
    connection:
      servers: "targetKafka:9092"
    consumer:
      "group.id": "k2k.mini-k2k.source"
replication:
  - source:
      name: topic-source
      # choose to replicate the topic named my-topic
      topic: "my-topic"
  - sink:
      name: topic-sink
      # use the original topic name as the target topic
      topic: source
      # replicate each record to the same partition as the
      # original topic 
      partition: source
coordination:
  kafka:
    commit:
      # set the consumer group name for reading 
      # the K2K internal coordination topic.
      group: "k2k.mini-k2k.coordination"  

Name

Required

Set the name of your K2K app.

name: K2K Prod to DR

You can use any string for the name.

Source

Required

Set the connection details for the source Kafka cluster. The source cluster is where K2K will be reading the topic data to replicate.

Specification

  1. The Kafka broker servers.

  2. The group ID for the K2K consumer group that will be reading the source data.

  3. Optionally, Schema registry.

source:
  kafka:
    connection:
      servers: "prod.kafka:9092"              #required
    consumer:
      "group.id": "k2k.prod-to-dr.source"     #required

    registry:                                 #optional
      url: "prod.schema-registry:8081"

With the source Schema Registry

Connect K2K with the source Kafka Schema Registry to:

  1. Translate records from the source Registry to the target Registry and sync the schemas.

  2. Filter and obfuscate records that use Registry schemas.

source:
  kafka:
    connection:
      servers: "prod.kafka:9092"
    consumer:
      "group.id": "k2k.prod-to-dr.source"
    registry: 
      url: "prod.schema-registry:8081"

Customise the source consumer

You can override the consumer configuration for K2K's source consumer. Add them under the consumer section:

source:
  kafka:
    connection:
      servers: "prod.kafka:9092"
    consumer:
      "group.id": "k2k.prod-to-dr.source"
      # add any consumer config overrides here
      "fetch.min.bytes": 1024
      "session.timeout.ms": 60000

Target

Required

Set the connection details for the target Kafka cluster. The target cluster is where K2K will be writing the topic data that it replicates.

The configuration is similar to Configuration but with controls for the producer that writes the replicated data.

Specification

At minimum, you need:

  • The Kafka broker servers.

  • Optionally, producer overrides.

  • Optionally, a Schema registry.

target:
  kafka:
    connection:
      servers: "dr.kafka:9092"              #required
    producer:                               #optional

    registry:                               #optional
      url: "dr.schema-registry:8081"

With the target Schema Registry

Connect K2K with the target Kafka Schema Registry to:

  1. Replicate data in Registry-compatible formats (AVRO, Protobuf).

  2. Translate records from the source Registry to the target Registry and sync the schemas.

target:
  kafka:
    connection:
      servers: "dr.kafka:9092"
    registry: 
      url: "dr.schema-registry:8081"

Customise the target producer

You can override the producer configuration for K2K's target producer. Add them under the optional producer section:

target:
  kafka:
    connection:
      servers: "dr.kafka:9092"
    producer:
      # add any producer config overrides here
      "batch.size": 16384
      "buffer.memory": 33554432

Replication

Required

Define the topics that will be replicated and choose how they will be routed to the target cluster topics.

Specification

replication:
  - source:
      name: source                             #required
      topic:                                   #required

  - sink:
      name: sink                               #required
      topic:                                   #required

      partition: source                        #required

Select source topics

Choose what source topics to replicate.

1

Specific topics

Use this when replicating a single topic or a list of specific topics.

replication:
  - source:
      name: source
      topic:
        - "prod-topic-1"
        - "prod-topic-2"
2

Topic patterns

Use it to capture topic name patterns such as "all topics with the prefix customer".

Express the pattern as a regex expression.

replication:
  - source:
      name: source
      topic: "prod.*"

Select target topics

Choose the destination topics for the replication and how they relate to the source topics.

1

1 to 1: keep the same topic names

The simplest option. Every selected source topic will be replicated to an equivalent topic on the target Kafka cluster with the same name.

replication:
  - sink:
      name: sink
      topic: source
      partition: source
Source topic
Resulting target topic

prod-topic-1

prod-topic-1

prod-topic-2

prod-topic-2

2

1 to 1: add prefix/suffix

Add a prefix and/or a suffix to the original topic name.

This is useful to avoid topic overlaps. It happens in setups where the target cluster might already have topics with the same name as the source.

replication:
  - sink:
      name: sink
      topic:
        prefix: "eu."
        suffix: ".copy"  
      partition: source
Source topic
Resulting target topic

prod-topic-1

eu.prod-topic-1.copy

prod-topic-2

eu.prod-topic-2.copy

3

All to 1: fixed target topic

All selected topics will be replicated to the same topic.

This is also useful when you're only replicating 1 topic and want to set the target topic to an arbitrary name.

replication:
  - sink:
      name: sink
      topic:
        static: "landing-topic" 
      partition: source
Source topic
Resulting target topic

prod-topic-1

landing-topic

prod-topic-2

landing-topic

4

Custom mapping

The most flexible option. Use patterns to choose which source topics get mapped to what target topics.

Define the source topics in the source section. Use this mapping to control how the already selected source topics map to target topics.

You can map individual topics (1 to 1) or source topic patterns to individual target topics (many to 1).

replication:
  - sink:
      name: sink
      topic:
        staticMapping:
          prod-topic-1: "copied-from-prod.1"     #1 to 1
          prod-topic-2: "prod-two-copy"          #1 to 1
          team-red.*: "team-red-combined-data"   #many to 1
          team-blue.*: "team-blue-combined-data" #many to 1
          .*: "other-teams-combined-data"        #many to 1
      partition: source
Source topic
Resulting target topic

prod-topic-1

copied-from-prod.1

prod-topic-1

prod-two-copy

team-red-topic-1

team-red-combined-data

team-red-topic-2

team-red-combined-data

team-blue-topic-1

team-blue-combined-data

team-blue-topic-2

team-blue-combined-data

another-topic-1

other-teams-combined-data

some-topic-2

other-teams-combined-data

Select a partitioning strategy

Choose how records are distributed to target topic partitions.

1

Same partition as the source topic

The most common approach. The record will be routed to the same partition number as it comes from the source topic. Useful to ensure that the target topic is the same as the source, partition by partition.

replication:
  - sink:
      name: sink
      topic: source
      partition: source
2

Producer decides

Let the K2K producer decide based on its configuration settings. Useful when the target topic has a different number of partitions than the source.

You can let the producer use the default key-based strategy, round robin, or even plug your own custom strategy.

  • See the partitioner.class configuration.

  • If you need to change the partitioner.class configuration, add it in the target.kafka.producer section of the config. See Configuration.

replication:
  - sink:
      name: sink
      topic: source
      partition: producer

Coordination

Required

Control how the K2K runners coordinate with each other to decide what source and target partitions to handle.

In simple mode, you set the consumer group name for the internal K2K coordination topic. In advanced mode, you can adjust how K2K commits its offsets and how it achieves exactly-once replication.

Specification

At minimum, you need:

  • The consumer group name for the internal K2K coordination topic. K2K uses the coordination topic to manage which source and target partitions each runner handles.

The coordination consumer group is different from the source consumer group.

  • The source consumer group controls reading data from the source Kafka cluster.

  • The coordination consumer group is internal to K2K.

coordination:
  kafka:
    commit:
      group: "k2k.prod-to-dr.coordination"     #required
      topic: "__k2k_consumer-offsets"          #optional
      syncTimeout: "10 seconds"                #optional
      batchSize: 100                           #optional
      batchTimeout: "5 seconds"                #optional
   assignment:
      topic: "__k2k-assignment"                #optional
      graceWindow: "15 seconds"                #optional
      fencingMaxParallelism: 10                #optional
    charset: "UTF-8"                           #optional
    consumer:                                  #optional

(Advanced) Customise the general coordination

WIP

(Advanced) Customise exactly-once coordination

WIP

(Advanced) Customise coordination consumer

You can override the consumer configuration for K2K's internal coordination topic. Add them under the consumer section:

coordination:
  kafka:
    commit:
      group: "k2k.prod-to-dr.coordination"
    consumer:
      # add any consumer config overrides here
      "fetch.min.bytes": 1024
      "session.timeout.ms": 60000

Features

(optional) Enable additional K2K capabilities:

  1. Deliver records exactly once.

  2. Replicate schemas between registries.

  3. Optimise offset commits.

  4. Add tracing headers.

Exactly once delivery

Enable exactly once semantics. This ensures that each record is replicated only once without any duplicates.

You'll need to set 2 additional properties on the source and target sections.

source:
  kafka:
    consumer:
      "isolation.level": read_committed #optional

target:
  kafka:
    producer:
      "enable.idempotence": true        #required

features:
  exactlyOnce: enabled

Replicate schemas

Replicate continuously the relevant schemas from the source to the target Schema Registry. The relevant schemas are the schemas that replicated records point at.

You'll need to provide the Schema Registry connection details in the source and target sections.

source:
  kafka:
    registry: 
      url: "prod.schema-registry:8081"

target:
  kafka:
    registry: 
      url: "dr.schema-registry:8081"

features:
  schemaMapping: enabled

Optimise offset commits

Optimize offset commits by publishing all control records to the same partition.

features:
  offsetCommitOptimizePartition: enabled

Tracing headers

Add tracing headers to each replicated record. Useful to add tracing metadata in your replication.

Choose to add standard metadata as headers:

  • partition - Add the source partition number as a header.

  • offset - Add the source offset number as a header.

  • topic - Add the source topic name as a header.

  • pipeline - Add the K2K app name (pipeline) as a header.

features:
  tracingHeaders:
    partition: enabled                  #optional
    offset: enabled                     #optional
    topic: enabled                      #optional
    pipeline: enabled                   #optional

If you want to customize the header names, use the tracing section.

features:
  tracingHeaders:
    partition: enabled
    offset: enabled
    topic: enabled
    pipeline: enabled
tracing:
  headers:
    partition: "k2k_partition"          #optional
    offset: "k2k_offset"                #optional
    topic: "k2k_topic"                  #optional
    pipeline: "k2k_pipeline"            #optional

Error handling

Optional

Choose how to deal with replication errors. You can tell K2K to either fail or ignore errors for different issues.

  • onCommitSyncTimeout - timeout error trying to determine the latest committed offset.

  • onControlMessageDeserializationError - Error when deserializing control messages from internal topics (K2K coordination and assignment topics).

errorHandling:
  onCommitSyncTimeout: fail|ignore                  #optional
  onControlMessageDeserializationError: fail|ignore #optional

Last updated

Was this helpful?