Routing Records
Control where records are sent when replicating.
Found an issue? Feed it back to us at Github, on Slack, Ask Marios or email.
To execute K2K, you must agree to the EULA and secure a free license.
Accept the EULA by setting license.acceptEula to true .
K2K can seamlessly redirect records from a topic in one Kafka Cluster to an identically named topic in a target Kafka Cluster. Additionally, K2K provides advanced routing options for customized configurations. This guide explores the various options available for routing records where desired.
Requirements
This tutorial assumes the following files exist (See Setting up for more details):
name: "my-first-replication"
features:
autoCreateControlTopics: enabled
autoCreateTopics: enabled
source:
kafka:
common:
"bootstrap.servers": "kafka-source:9092"
consumer:
"group.id": "k2k.my-first-k2k"
target:
kafka:
common:
"bootstrap.servers": "kafka-target:9092"
replication:
- source:
topic: ".*"
- sink:
topic: source
partition: sourceservices:
k2k:
image: "lensesio/k2k:0.5.0"
volumes:
- ".:/pipelines"
environment:
OTEL_SERVICE_NAME: "k2k"
OTEL_METRICS_EXPORTER: none
OTEL_TRACES_EXPORTER: none
OTEL_LOGS_EXPORTER: none
#LENSES_K2K_ACCEPT_EULA: true
command:
- k2k
- start
- -f
- /pipelines/k2k-pipeline.yml
- -t
kafka-source:
image: "apache/kafka:3.8.0"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
ports:
- "9094:9094"
kafka-target:
image: "apache/kafka:3.8.0"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
ports:
- 9099:9099To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.
docker compose downUse the same topic name
K2K sends records to a topic that shares the same name as the source topic.
replication:
- source:
....
- sink:
topic: source
partition: sourceAdding prefix/suffix
K2K allows you to map a source cluster topic to a target topic by adding prefixes and/or suffixes to the original topic name. You can configure these options individually or together:
Prefix: Attach a string to the start of the topic name.
Suffix: Append a string to the end of the topic name.
replication:
- source:
...
- sink:
topic:
prefix: "k2k."
suffix: ".replicated"
partition: sourceReplicate multiple topics to one topic
To consolidate data from multiple source cluster topics to a single target cluster topic, use the following configuration:
replication:
- source:
...
- sink:
topic:
static: "all-messages-topic"
partition: sourceTopics Mapping
K2K routes records by matching topic names with a list of regular expressions. It processes these rules sequentially, routing the record when a match is found with the source topic.
replication:
- source:
...
- sink:
topic:
"user-.*": "user-topic-replicated"
"transfers-.*": "transaction-topic-unified"
"does-not-exist.*": "will-not-be-created"
.*: "dlq"
partition: sourceLast updated
Was this helpful?

