Replicated topics configs

Control target cluster topic creation and their configuration.

K2K streamlines data transfer between environments by automatically creating target cluster topics when they do not exist. This automation enhances workflow efficiency and reduces manual setup. However, this feature is only active when the auto-create topics flag is enabled. Without it, users must manually ensure the necessary topics are available in the target cluster, which can be tedious and error-prone, especially with configuration differences like variations in partition count or replication factors.

Requirements

This tutorial assumes the following files exist (See Setting up for more details):

name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

docker compose down
1

Start the Kafka clusters

To start two local Kafka clusters, use this command:

#start two kafka instances
docker compose up -d kafka-source kafka-target
2

Creating source cluster topics

Run the following commands to create these topics: user-topic, transaction-topic, transfers-eu and transfers-us:

#create topic user-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
3

Defining the replication rules

Rules are defined under topicCreation.replication. The final topic configuration merges three sources in order of precedence:

  • Matching rules from topicCreation.replication.rules (highest priority)

  • Common settings from topicCreation.replication.common

  • Source topic configuration (lowest priority)

Below are examples illustrating the topic creation process:

Topic: user-topic

  • Created with 20 partitions.

  • Applies topicCreation.replication.rules[0] that matches the topic name.

  • This rule takes precedence over both topicCreation.replication.common.partitions and the original topic's partition count.

Topic: transfers-us

  • Created without specifying a partition count.

  • Defaults to the Kafka instance's standard partition count.

  • Occurs because topicCreation.replication.common.partitions is set to null, overtaking the source topic's partition count.

topicCreation:
  replication:
    common:
      partitions: null
      replication: 1
      config:
        "delete.retention.ms": "1000"
    rules:
      - pattern: ".*-topic"
        properties:
          partitions: 20
          replication: 1
          config:
            "delete.retention.ms": "1000"
      - pattern: "transfers.*"
        config:
          "delete.retention.ms": "2000"
      - pattern: ".*"
        partition: 7
4

Run it

Use the following command to run the K2K replicator app:

#start k2k
docker compose up k2k
5

Validate the results

docker compose exec kafka-target \
      ./opt/kafka/bin/kafka-topics.sh \
      --describe \
      --topic user-topic \
      --bootstrap-server 127.0.0.1:9099

#there should be 20 partitions
#delete.retention.ms should be 1000
k2k-pipeline.yml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
topicCreation:
  replication:
    common:
      partitions: 1
      replication: 1
      config:
        "delete.retention.ms": "1000"
    rules:
      - pattern: ".*-topic"
        properties:
          partitions: 20
          replication: 1
          config:
            "delete.retention.ms": "1000"
      - pattern: "transfers.*"
        properties:
          config:
            "delete.retention.ms": "2000"
      - pattern: ".*"
        properties:
          partition: 7
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: producer

Last updated

Was this helpful?