Run a quick example

Get K2K running quickly and replicate one topics between 2 Kafka clusters.

Use Docker to run an example with 2 Kafka clusters and replicate 1 topic from one side to the other via K2K.

Configuration file

Create a K2K configuration file and save it as k2k-pipeline.yml.

It tells K2K the source Kafka, the target Kafka and which topic to replicate.

name: "my-topic-k2k"
source:
  kafka:
    connection:
      servers: "sourceKafka:9092"
    consumer:
      "group.id": "k2k.my-topic-k2k.source"
target:
  kafka:
    connection:
      servers: "targetKafka:9092"
replication:
  - source:
      name: source
      topic: "my-topic"
  - sink:
      name: sink-source-topic
      topic: source
      partition: source
coordination:
  kafka:
    commit:
      group: "k2k.my-topic-k2k.coordination"

Docker compose file

Create a Docker Compose file:

services:
  k2k:
    image: "lensting/k2k:0.0.2-alpha"
    volumes:
      - ".:/pipelines"
    environment:
      K2K_PIPELINE_FOLDER: "/pipelines"
      OTEL_SERVICE_NAME: "k2k"
      OTEL_METRICS_EXPORTER: console
      OTEL_TRACES_EXPORTER: none
      OTEL_LOGS_EXPORTER: none
    command:
      - k2k
      - start
      - -f
      - /pipelines/k2k-pipeline.yml
      - -t
      - --acceptEULA
  sourceKafka:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://sourceKafka:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - "9094:9094"
  targetKafka:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://targetKafka:9092,EXTERNAL://127.0.0.1:9099
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - 9099:9099

Run it

Get it up and running

docker compose up -d

Check the results

Get 3 terminal windows up for:

  1. The source cluster consumer to see the original messages.

  2. The target cluster consumer to see the replicated messages.

  3. The source cluster producer to insert some messages into the source cluster.

(1) Get a console consumer for the source cluster

docker compose exec sourceKafka \
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9094 \
--topic my-topic \
--property print.key=true

(2) Get a console consumer for the target cluster:

docker compose exec targetKafka \
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9099 \
--topic my-topic \
--property print.key=true

(3) Get a console producer for the source cluster:

docker compose exec sourceKafka \
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9094 \
--topic my-topic \
--property parse.key=true \
--property key.separator=,

Produce some messages:

key1,hello
key1,this
key1,is
key1,k2k

See them replicated to the target Kafka.

Last updated

Was this helpful?