# Routing Records

{% hint style="warning" %}
To execute K2K, you must agree to the EULA and secure a free license.&#x20;

Accept the EULA by setting `license.acceptEula` to `true` .
{% endhint %}

K2K can seamlessly redirect records from a topic in one Kafka Cluster to an identically named topic in a target Kafka Cluster. Additionally, K2K provides advanced routing options for customized configurations. This guide explores the various options available for routing records where desired.

## Requirements

This tutorial assumes the following files exist (See [run-a-quick-example](https://docs.lenses.io/latest/k2k/1.0/tutorial/run-a-quick-example "mention") for more details):

{% tabs %}
{% tab title="k2k-pipeline.yml" %}

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

{% endtab %}

{% tab title="docker-compose.yml" %}

```yaml
services:
  k2k:
    image: "lensesio/k2k:0.5.0"
    volumes:
      - ".:/pipelines"
    environment:
      OTEL_SERVICE_NAME: "k2k"
      OTEL_METRICS_EXPORTER: none
      OTEL_TRACES_EXPORTER: none
      OTEL_LOGS_EXPORTER: none
      #LENSES_K2K_ACCEPT_EULA: true
    command:
      - k2k
      - start
      - -f
      - /pipelines/k2k-pipeline.yml
      - -t
  kafka-source:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - "9094:9094"
  kafka-target:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - 9099:9099
```

{% endtab %}
{% endtabs %}

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

```bash
docker compose down
```

## Use the same topic name

K2K sends records to a topic that shares the same name as the source topic.

```yaml
replication:
  - source:
     ....
  - sink:
      topic: source
      partition: source
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

</details>

<details>

<summary> See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

## Adding prefix/suffix

K2K allows you to map a source cluster topic to a target topic by adding prefixes and/or suffixes to the original topic name. You can configure these options individually or together:

* **Prefix**: Attach a string to the start of the topic name.
* **Suffix**: Append a string to the end of the topic name.

```yaml
replication:
  - source:
      ...
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
```

</details>

<details>

<summary> See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

## Replicate multiple topics to one topic

To consolidate data from multiple source cluster topics to a single target cluster topic, use the following configuration:

```yaml
replication:
  - source:
      ...
  - sink:
      topic:
        static: "all-messages-topic"
      partition: source
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        prefix: "k2k."
        suffix: ".replicated"
      partition: source
```

</details>

<details>

<summary> See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

## Topics Mapping

K2K routes records by matching topic names with a list of regular expressions. It processes these rules sequentially, routing the record when a match is found with the source topic.

```yaml
replication:
  - source:
      ...
  - sink:
      topic:
        "user-.*": "user-topic-replicated"
        "transfers-.*": "transaction-topic-unified"
        "does-not-exist.*": "will-not-be-created"
        .*: "dlq"   
    partition: source
```

{% hint style="info" %}
Target cluster topics are created, if they don't exist, **only** when they match a rule present in the source cluster. If there is no matching topic, none will be created.
{% endhint %}

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic:
        "user-.*": "user-topic-replicated"
        "transfers-.*": "transaction-topic-unified"
        "does-not-exist.*": "will-not-be-created"
        .*: "dlq"      
      partition: source
```

</details>

<details>

<summary> See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>
