Configuring control topics
Configure names and configuration of control topics.
Found an issue? Feed it back to us at Github, on Slack, Ask Marios or email.
To execute K2K, you must agree to the EULA and secure a free license.
Accept the EULA by setting license.acceptEula to true .
In a K2K setup, up to three topics are required for state coordination and progress tracking, depending on the feature used. See Coordination and Assignment for details.
Environment with all three topics
Utilize these files to initiate an replication instance exercising all three control topics.
name: "my-first-replication"
features:
autoCreateControlTopics: enabled
autoCreateTopics: enabled
exactlyOnce: enabled
source:
kafka:
common:
"bootstrap.servers": "kafka-source:9092"
consumer:
"group.id": "k2k.my-first-k2k"
target:
kafka:
common:
"bootstrap.servers": "kafka-target:9092"
replication:
- source:
topic: ".*"
- sink:
topic: source
partition: sourceservices:
k2k:
image: "lensesio/k2k:0.1.0"
volumes:
- ".:/pipelines"
environment:
OTEL_SERVICE_NAME: "k2k"
OTEL_METRICS_EXPORTER: kafka
OTEL_TRACES_EXPORTER: none
OTEL_LOGS_EXPORTER: none
#LENSES_K2K_ACCEPT_EULA: true
command:
- k2k
- start
- -f
- /pipelines/k2k-pipeline.yml
- -t
kafka-source:
image: "apache/kafka:3.8.0"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
ports:
- "9094:9094"
kafka-target:
image: "apache/kafka:3.8.0"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
ports:
- 9099:9099Customizing control topic names
To customize control topic names in the K2K application, modify these configuration properties:
coordination.kafka.commit.topiccoordination.kafka.assignement.topicmetrics.kafka.topic
coordination:
kafka:
topic: "custom-commit-topic"
...#see the newly created topics
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092Customizing configurations
Enable auto-creation to ensure K2K creates control topics only if they are missing. To prevent interference from previous steps, execute:
docker compose downSpecify the properties of control topics as indicated:
topicCreation:
control:
common:
partitions: 1
replication: 1
config:
"delete.retention.ms": "1000"
"cleanup.policy": "compact"
assignment:
replication: 1
config:
"delete.retention.ms": "2000"
commit:
partitions: 10
replication: 5
config:
"delete.retention.ms": "2000"
metrics:
partitions: 10
replication: 5
config:
"delete.retention.ms": "2000"
"cleanup.policy": "delete" For each control topic, the final configuration is created by merging the common configuration section with the specific section for that topic.
By default, assignments and commits are created as compacted topics. Modifying the cleanup policy might cause K2K to fail at startup. Although you can disable control topic validation using features.validateControlTopicSettings, it is not advisable. Using non-compacted topics can degrade performance.
Last updated
Was this helpful?

