Partition assignment

Route Kafka records to specific partition.

K2K offers two ways to map source cluster topics to destination partitions:

  1. Preserve Source Cluster Partition: Use the same partition as the source topic.

  2. Kafka Producer-Based Routing: Use a producer configured routing strategy. Useful e.g when the target topic's partitions differ from the source to accommodate varying partition structures.

This guide covers these options to optimize your mapping strategy.

Requirements

This tutorial assumes the following files exist (See Setting up for more details):

name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

docker compose down

Preserving the original topic partition

To retain the original record partition during copying, set the sink partition property to "source".


replication:
  - source:
     ...
  - sink:
      topic: source   
      partition: source
k2k-pipeline.yml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source   
      partition: source
See it in action
docker compose up -d kafka-source kafka-target
#create a topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
# add some data to the source topic
docker-compose exec kafka-source \
    ./opt/kafka/bin/kafka-producer-perf-test.sh \
    --topic user-topic \
    --num-records 100 \
    --record-size 20 \
    --throughput -1 \
    --producer-props bootstrap.servers=localhost:9092
docker compose up -d k2k
#inspect the target topic
docker compose exec kafka-target \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9099 \
  --topic user-topic \
  --property print.key=true \
  --property key.separator=, \
  --property "print.offset=true" \
  --property "print.partition=true" \
  --from-beginning
#compare both source and target topics 

#target instance records
docker compose exec kafka-target \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9099 \
  --topic user-topic \
  --property print.key=true \
  --property key.separator=, \
  --property "print.offset=true" \
  --property "print.partition=true" \
  --from-beginning  

#source instance records
docker compose exec kafka-source \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic user-topic \
  --property print.key=true \
  --property key.separator=, \
  --property "print.offset=true" \
  --property "print.partition=true" \
  --from-beginning  

Producer defined partition

Another option is to write records using the producer's default partitioning strategy. If chosen, the Kafka Producer determines the target partition based on its configured partition assignment strategy.

target:
  kafka:
    producer: 
      "partitioner.class": "org.apache.kafka.clients.producer.RoundRobinPartitione"
replication:
  - source:
     ...
  - sink:
      topic: source   
      partition: producer
k2k-pipeline.yml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
    producer:
       "partitioner.class": "org.apache.kafka.clients.producer.RoundRobinPartitioner"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source   
      partition: producer
See it in action
docker compose up -d kafka-source kafka-target
#create a topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
# add some data to the source topic
docker-compose exec kafka-source \
    ./opt/kafka/bin/kafka-producer-perf-test.sh \
    --topic user-topic \
    --num-records 100 \
    --record-size 20 \
    --throughput -1 \
    --producer-props bootstrap.servers=localhost:9092
docker compose up -d k2k
#inspect the target topic
docker compose exec kafka-target \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9099 \
  --topic user-topic \
  --property print.key=true \
  --property key.separator=, \
  --property "print.offset=true" \
  --property "print.partition=true" \
  --from-beginning
#compare both source and target topics 

#target instance records
docker compose exec kafka-target \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9099 \
  --topic user-topic \
  --property print.key=true \
  --property key.separator=, \
  --property "print.offset=true" \
  --property "print.partition=true" \
  --from-beginning  

#source instance records
docker compose exec kafka-source \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic user-topic \
  --property print.key=true \
  --property key.separator=, \
  --property "print.offset=true" \
  --property "print.partition=true" \
  --from-beginning  

Last updated

Was this helpful?