# Routing Records

{% hint style="warning" %}
To execute K2K, you must agree to the EULA and secure a free license.

Accept the EULA by setting `license.acceptEula` to `true` .
{% endhint %}

K2K can seamlessly redirect records from a topic in one Kafka Cluster to an identically named topic in a target Kafka Cluster. Additionally, K2K provides advanced routing options for customized configurations. This guide explores the various options available for routing records where desired.

This tutorial assumes the following files exist (See [run-a-quick-example](https://docs.lenses.io/latest/k2k/tutorial/run-a-quick-example "mention") for more details):

{% tabs %}
{% tab title="k2k-pipeline.yml" %}

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

{% endtab %}

{% tab title="docker-compose.yml" %}

```yaml
services:
  k2k:
    image: "lensesio/k2k:0.5.0"
    volumes:
      - ".:/pipelines"
    environment:
      OTEL_SERVICE_NAME: "k2k"
      OTEL_METRICS_EXPORTER: none
      OTEL_TRACES_EXPORTER: none
      OTEL_LOGS_EXPORTER: none
      #LENSES_K2K_ACCEPT_EULA: true
    command:
      - k2k
      - start
      - -f
      - /pipelines/k2k-pipeline.yml
      - -t
  kafka-source:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - "9094:9094"
  kafka-target:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - 9099:9099
```

{% endtab %}
{% endtabs %}

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

```bash
docker compose down
```

## Use the same topic name

K2K sends records to a topic that shares the same name as the source topic.

```yaml
replication:
  - source:
     ....
  - sink:
      topic: source
      partition: source
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

</details>

<details>

<summary>See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

## Adding prefix/suffix

K2K allows you to map a source cluster topic to a target topic by adding prefixes and/or suffixes to the original topic name. You can configure these options individually or together:

* **Prefix**: Attach a string to the start of the topic name.
* **Suffix**: Append a string to the end of the topic name.

```yaml
replication:
  - source:
      ...
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
```

</details>

<details>

<summary>See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

## Replicate multiple topics to one topic

To consolidate data from multiple source cluster topics to a single target cluster topic, use the following configuration:

```yaml
replication:
  - source:
      ...
  - sink:
      topic:
        static: "all-messages-topic"
      partition: source
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
```

</details>

<details>

<summary>See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

## Topics Mapping

K2K routes records by matching topic names with a list of regular expressions. It processes these rules sequentially, routing the record when a match is found with the source topic.

```yaml
replication:
  - source:
      ...
  - sink:
      topic:
        "user-.*": "user-topic-replicated"
        "transfers-.*": "transaction-topic-unified"
        "does-not-exist.*": "will-not-be-created"
        .*: "dlq"   
    partition: source
```

{% hint style="info" %}
Target cluster topics are created, if they don't exist, **only** when they match a rule present in the source cluster. If there is no matching topic, none will be created.
{% endhint %}

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        "user-.*": "user-topic-replicated"
        "transfers-.*": "transaction-topic-unified"
        "does-not-exist.*": "will-not-be-created"
        .*: "dlq"      
      partition: source
```

</details>

<details>

<summary>See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>
