# Selecting topics

{% hint style="warning" %}
To execute K2K, you must agree to the EULA and secure a free license.

Accept the EULA by setting `license.acceptEula` to `true` .
{% endhint %}

K2K enables simultaneous replication of multiple topics. This section details the configuration process for replicating more than one topic at a time. We'll cover two methods for selecting topics to replicate: using regex and a fixed list.

This tutorial assumes the following files exist (See [run-a-quick-example](https://docs.lenses.io/latest/k2k/tutorial/run-a-quick-example "mention") for more details):

{% tabs %}
{% tab title="k2k-pipeline.yml" %}

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

{% endtab %}

{% tab title="docker-compose.yml" %}

```yaml
services:
  k2k:
    image: "lensesio/k2k:0.5.0"
    volumes:
      - ".:/pipelines"
    environment:
      OTEL_SERVICE_NAME: "k2k"
      OTEL_METRICS_EXPORTER: none
      OTEL_TRACES_EXPORTER: none
      OTEL_LOGS_EXPORTER: none
      #LENSES_K2K_ACCEPT_EULA: true
    command:
      - k2k
      - start
      - -f
      - /pipelines/k2k-pipeline.yml
      - -t
  kafka-source:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - "9094:9094"
  kafka-target:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - 9099:9099
```

{% endtab %}
{% endtabs %}

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

```bash
docker compose down
```

{% stepper %}
{% step %}

### Start Kafka Clusters

<pre class="language-sh"><code class="lang-sh"><strong>#start two kafka instances
</strong>docker compose up -d kafka-source kafka-target
</code></pre>

{% endstep %}

{% step %}

### Creating topics in the Source cluster

Run the following commands to create these topics: user-topic, transaction-topic, transfers-eu and transfers-us:

```bash
#create topic user-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
#create topic transaction-topic
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transaction-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092
#create topic transfers-eu      
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-eu \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
      
#create topic transfers-us
docker compose exec kafka-source \
      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic transfers-us \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092 
```

{% endstep %}

{% step %}

### Select Replicated Topics Using Regex

To select multiple topics using a regex pattern, specify it for the property `replication[0].source.topic` as shown:

```yaml
replication:
  - source:
      topic: ".*-topic"
```

During replication, any topics that match the regular expression and are created after replication starts will be selected and replicated to the target.

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
      "metadata.max.age.ms": "1000" #refresh often
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*-topic"
  - sink:
      topic: source
      partition: source
```

{% hint style="info" %}
K2K uses a Kafka Consumer to read data from the source instance.

By default, the Consumer refreshes its metadata only after the interval specified by `source.kafka.consumer.metadata.max.age.ms`. To reduce this waiting time, set this property to a lower value.
{% endhint %}
{% endstep %}

{% step %}

### Run K2K

Use the following command to run the K2K replicator app:

<pre class="language-bash"><code class="lang-bash"><strong>#start k2k
</strong>docker compose up k2k
</code></pre>

{% endstep %}

{% step %}

### Validate the results

To verify that the expected topics were created in a running instance of K2K, use the following command:

```bash
docker compose exec kafka-target \
  ./opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092
```

Expected topics:

* `__k2k_consumer-offsets`
* `transaction-topic`
* `user-topic`

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
      "metadata.max.age.ms": "1000" #refresh often
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic:
        - "transfers-us"
        - "transfers-eu"
  - sink:
      topic: source
      partition: source
```

{% endstep %}

{% step %}

### Source topics as a list

When a regular expression isn't needed, you can use a fixed list of topics for replication. K2K provides the following configuration option for this purpose:

```yaml
replication:
  - source:
      topic:
        - "transfers-us"
        - "transfers-eu"
```

{% endstep %}

{% step %}

### Restart the replicator

```bash
 docker compose restart k2k
```

{% endstep %}

{% step %}

### Validate the results

Confirm the successful creation of both topics in the target cluster.

```
#see the newly created topics
docker compose exec kafka-target \
./opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
```

{% endstep %}
{% endstepper %}
