# Partition assignment

{% hint style="warning" %}
To execute K2K, you must agree to the EULA and secure a free license.

Accept the EULA by setting `license.acceptEula` to `true` .
{% endhint %}

K2K offers two ways to map source cluster topics to destination partitions:

1. **Preserve Source Cluster Partition**: Use the same partition as the source topic.
2. **Kafka Producer-Based Routing**: Use a producer configured routing strategy. Useful e.g when the target topic's partitions differ from the source to accommodate varying partition structures.

This guide covers these options to optimize your mapping strategy.

This tutorial assumes the following files exist (See [run-a-quick-example](https://docs.lenses.io/latest/k2k/tutorial/run-a-quick-example "mention") for more details):

{% tabs %}
{% tab title="k2k-pipeline.yml" %}

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

{% endtab %}

{% tab title="docker-compose.yml" %}

```yaml
services:
  k2k:
    image: "lensesio/k2k:0.5.0"
    volumes:
      - ".:/pipelines"
    environment:
      OTEL_SERVICE_NAME: "k2k"
      OTEL_METRICS_EXPORTER: none
      OTEL_TRACES_EXPORTER: none
      OTEL_LOGS_EXPORTER: none
      #LENSES_K2K_ACCEPT_EULA: true
    command:
      - k2k
      - start
      - -f
      - /pipelines/k2k-pipeline.yml
      - -t
  kafka-source:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - "9094:9094"
  kafka-target:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - 9099:9099
```

{% endtab %}
{% endtabs %}

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

```bash
docker compose down
```

## Preserving the original topic partition

To retain the original record partition during copying, set the sink partition property to "source".

```yaml

replication:
  - source:
     ...
  - sink:
      topic: source   
      partition: source
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source   
      partition: source
```

</details>

<details>

<summary>See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

## Producer defined partition

Another option is to write records using the producer's default partitioning strategy. If chosen, the Kafka Producer determines the target partition based on its configured partition assignment strategy.

{% hint style="warning" %}
Choose this configuration when replicating a topic with a different partition count than the target topic.
{% endhint %}

```yaml
target:
  kafka:
    producer: 
      "partitioner.class": "org.apache.kafka.clients.producer.RoundRobinPartitione"
replication:
  - source:
     ...
  - sink:
      topic: source   
      partition: producer
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    connection:
      servers: "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    connection:
      servers: "kafka-target:9092"
    producer:
       "partitioner.class": "org.apache.kafka.clients.producer.RoundRobinPartitioner"
coordination:
  kafka:
    commit:
      group: "k2k.my-first-k2k"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source   
      partition: producer
```

</details>

<details>

<summary>See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>
