# Configuring control topics

{% hint style="warning" %}
To execute K2K, you must agree to the EULA and secure a free license.

Accept the EULA by setting `license.acceptEula` to `true` .
{% endhint %}

In a K2K setup, up to three topics are required for state coordination and progress tracking, depending on the feature used. See [#coordination-and-assignment](https://docs.lenses.io/latest/k2k/readme/concepts#coordination-and-assignment "mention") for details.

## Environment with all three topics

Utilize these files to initiate an replication instance exercising all three control topics.

{% tabs %}
{% tab title="k2k-pipeline.yml" %}

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
  exactlyOnce: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

{% endtab %}

{% tab title="docker-compose.yml" %}

```yaml
services:
  k2k:
    image: "lensesio/k2k:0.1.0"
    volumes:
      - ".:/pipelines"
    environment:
      OTEL_SERVICE_NAME: "k2k"
      OTEL_METRICS_EXPORTER: kafka
      OTEL_TRACES_EXPORTER: none
      OTEL_LOGS_EXPORTER: none
      #LENSES_K2K_ACCEPT_EULA: true
    command:
      - k2k
      - start
      - -f
      - /pipelines/k2k-pipeline.yml
      - -t
  kafka-source:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - "9094:9094"
  kafka-target:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - 9099:9099
```

{% endtab %}
{% endtabs %}

## Customizing control topic names

To customize control topic names in the K2K application, modify these configuration properties:

* `coordination.kafka.commit.topic`
* `coordination.kafka.assignement.topic`
* `metrics.kafka.topic`

```yaml
coordination:
  kafka:
    topic: "custom-commit-topic"
    ...
```

<details>

<summary>k2k-pipeline.yml</summary>

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
  exactlyOnce: enabled
coordination:
  topic: "custom-commit-topic"
assignement:
  topic: "custom-assignment-topic"
metrics:
  topic: "custom-metrics-topic"
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

</details>

<details>

<summary>See it in action</summary>

```bash
#start two kafka instances
docker compose up -d kafka-source kafka-target
```

<pre class="language-bash"><code class="lang-bash"><strong>#create topic user-topic
</strong>docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 

</code></pre>

```bash
docker compose up -d k2
```

```bash
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

</details>

{% hint style="info" %}
K2K can operate using the same topic name for both assignment and offset commit. However, the metrics topic should always be separate, as it receives more messages and, unlike the previous topics, is not intended for compaction.\
**Note: K2K instances (even if handling different pipeline definitions) can safely reuse the same control topics.**
{% endhint %}

## Customizing configurations

Enable auto-creation to ensure K2K creates control topics only if they are missing. To prevent interference from previous steps, execute:

```bash
docker compose down
```

Specify the properties of control topics as indicated:

```yaml
topicCreation:
  control:
    common:
      partitions: 1              
      replication: 1                             
      config:
        "delete.retention.ms": "1000"
        "cleanup.policy": "compact"              
    assignment: 
      replication: 1                             
      config:
        "delete.retention.ms": "2000"            
    commit:
      partitions: 10                             
      replication: 5                             
      config: 
        "delete.retention.ms": "2000"            
    metrics:
      partitions: 10                             
      replication: 5                             
      config:
        "delete.retention.ms": "2000"            
        "cleanup.policy": "delete"               
```

For each control topic, the final configuration is created by merging the `common` configuration section with the specific section for that topic.

{% hint style="warning" %}
By default, assignments and commits are created as compacted topics. Modifying the cleanup policy might cause K2K to fail at startup. Although you can disable control topic validation using `features.validateControlTopicSettings`, it is not advisable. Using non-compacted topics can degrade performance.
{% endhint %}
