Routing Records

Control where records are sent when replicating.

K2K can seamlessly redirect records from a topic in one Kafka Cluster to an identically named topic in a target Kafka Cluster. Additionally, K2K provides advanced routing options for customized configurations. This guide explores the various options available for routing records where desired.

Requirements

This tutorial assumes the following files exist (See Setting up for more details):

name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

docker compose down

Use the same topic name

K2K sends records to a topic that shares the same name as the source topic.

replication:
  - source:
     ....
  - sink:
      topic: source
      partition: source
k2k-pipeline.yml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
See it in action
#start two kafka instances
docker compose up -d kafka-source kafka-target
#create topic user-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
docker compose up -d k2
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092

Adding prefix/suffix

K2K allows you to map a source cluster topic to a target topic by adding prefixes and/or suffixes to the original topic name. You can configure these options individually or together:

  • Prefix: Attach a string to the start of the topic name.

  • Suffix: Append a string to the end of the topic name.

replication:
  - source:
      ...
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
k2k-pipeline.yml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
See it in action
#start two kafka instances
docker compose up -d kafka-source kafka-target
#create topic user-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
docker compose up -d k2
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092

Replicate multiple topics to one topic

To consolidate data from multiple source cluster topics to a single target cluster topic, use the following configuration:

replication:
  - source:
      ...
  - sink:
      topic:
        static: "all-messages-topic"
      partition: source
k2k-pipeline.yml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
See it in action
#start two kafka instances
docker compose up -d kafka-source kafka-target
#create topic user-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
docker compose up -d k2
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092

Topics Mapping

K2K routes records by matching topic names with a list of regular expressions. It processes these rules sequentially, routing the record when a match is found with the source topic.

replication:
  - source:
      ...
  - sink:
      topic:
        "user-.*": "user-topic-replicated"
        "transfers-.*": "transaction-topic-unified"
        "does-not-exist.*": "will-not-be-created"
        .*: "dlq"   
    partition: source

Target cluster topics are created, if they don't exist, only when they match a rule present in the source cluster. If there is no matching topic, none will be created.

k2k-pipeline.yml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        "user-.*": "user-topic-replicated"
        "transfers-.*": "transaction-topic-unified"
        "does-not-exist.*": "will-not-be-created"
        .*: "dlq"      
      partition: source
See it in action
#start two kafka instances
docker compose up -d kafka-source kafka-target
#create topic user-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
docker compose up -d k2
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092

Last updated

Was this helpful?