Selecting topics
Choose which topics should/should not be replicated.
To execute K2K, you must agree to the EULA and secure a free license.
Accept the EULA by setting license.acceptEula to true .
K2K enables simultaneous replication of multiple topics. This section details the configuration process for replicating more than one topic at a time. We'll cover two methods for selecting topics to replicate: using regex and a fixed list.
Requirements
This tutorial assumes the following files exist (See Setting up for more details):
name: "my-first-replication"
features:
autoCreateControlTopics: enabled
autoCreateTopics: enabled
source:
kafka:
common:
"bootstrap.servers": "kafka-source:9092"
consumer:
"group.id": "k2k.my-first-k2k"
target:
kafka:
common:
"bootstrap.servers": "kafka-target:9092"
replication:
- source:
topic: ".*"
- sink:
topic: source
partition: sourceservices:
k2k:
image: "lensesio/k2k:0.5.0"
volumes:
- ".:/pipelines"
environment:
OTEL_SERVICE_NAME: "k2k"
OTEL_METRICS_EXPORTER: none
OTEL_TRACES_EXPORTER: none
OTEL_LOGS_EXPORTER: none
#LENSES_K2K_ACCEPT_EULA: true
command:
- k2k
- start
- -f
- /pipelines/k2k-pipeline.yml
- -t
kafka-source:
image: "apache/kafka:3.8.0"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
ports:
- "9094:9094"
kafka-target:
image: "apache/kafka:3.8.0"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
ports:
- 9099:9099To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.
docker compose down Creating topics in the Source cluster
Run the following commands to create these topics: user-topic, transaction-topic, transfers-eu and transfers-us:
#create topic user-topic
docker compose exec kafka-source \
./opt/kafka/bin/kafka-topics.sh \
--create \
--topic user-topic \
--partitions 5 \
--bootstrap-server 127.0.0.1:9092
#create topic transaction-topic
docker compose exec kafka-source \
./opt/kafka/bin/kafka-topics.sh \
--create \
--topic transaction-topic \
--partitions 5 \
--bootstrap-server 127.0.0.1:9092
#create topic transfers-eu
docker compose exec kafka-source \
./opt/kafka/bin/kafka-topics.sh \
--create \
--topic transfers-eu \
--partitions 5 \
--bootstrap-server 127.0.0.1:9092
#create topic transfers-us
docker compose exec kafka-source \
./opt/kafka/bin/kafka-topics.sh \
--create \
--topic transfers-us \
--partitions 5 \
--bootstrap-server 127.0.0.1:9092 Select Replicated Topics Using Regex
To select multiple topics using a regex pattern, specify it for the property replication[0].source.topic as shown:
replication:
- source:
topic: ".*-topic"During replication, any topics that match the regular expression and are created after replication starts will be selected and replicated to the target.
name: "my-first-replication"
features:
autoCreateControlTopics: enabled
autoCreateTopics: enabled
source:
kafka:
common:
"bootstrap.servers": "kafka-source:9092"
consumer:
"group.id": "k2k.my-first-k2k"
"metadata.max.age.ms": "1000" #refresh often
target:
kafka:
common:
"bootstrap.servers": "kafka-target:9092"
replication:
- source:
topic: ".*-topic"
- sink:
topic: source
partition: sourceValidate the results
To verify that the expected topics were created in a running instance of K2K, use the following command:
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092Expected topics:
__k2k_consumer-offsetstransaction-topicuser-topic
name: "my-first-replication"
features:
autoCreateControlTopics: enabled
autoCreateTopics: enabled
source:
kafka:
common:
"bootstrap.servers": "kafka-source:9092"
consumer:
"group.id": "k2k.my-first-k2k"
"metadata.max.age.ms": "1000" #refresh often
target:
kafka:
common:
"bootstrap.servers": "kafka-target:9092"
replication:
- source:
topic:
- "transfers-us"
- "transfers-eu"
- sink:
topic: source
partition: sourceLast updated
Was this helpful?

