# Schema Registry Replication

{% hint style="warning" %}
To execute K2K, you must agree to the EULA and secure a free license.

Accept the EULA by setting `license.acceptEula` to `true` .
{% endhint %}

K2K supports migrating schemas from a source cluster to a destination cluster. This page covers the configuration steps required to enable this feature. For now K2K supports Schema Registries compatible with Confluent Schema Registry API.

This tutorial assumes the following files exist (See [run-a-quick-example](https://docs.lenses.io/latest/k2k/tutorial/run-a-quick-example "mention") for more details):

{% tabs %}
{% tab title="k2k-pipeline.yml" %}

```yaml
name: "my-first-replication"
features:
  autoCreateControlTopics: enabled
  autoCreateTopics: enabled
source:
  kafka:
    common:
      "bootstrap.servers": "kafka-source:9092"
    consumer:
      "group.id": "k2k.my-first-k2k"
target:
  kafka:
    common:
      "bootstrap.servers": "kafka-target:9092"
replication:
  - source:
      topic: ".*"
  - sink:
      topic: source
      partition: source
```

{% endtab %}

{% tab title="docker-compose.yml" %}

```yaml
services:
  k2k:
    image: "lensesio/k2k:0.5.0"
    volumes:
      - ".:/pipelines"
    environment:
      OTEL_SERVICE_NAME: "k2k"
      OTEL_METRICS_EXPORTER: none
      OTEL_TRACES_EXPORTER: none
      OTEL_LOGS_EXPORTER: none
      #LENSES_K2K_ACCEPT_EULA: true
    command:
      - k2k
      - start
      - -f
      - /pipelines/k2k-pipeline.yml
      - -t
  kafka-source:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9094,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-source:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - "9094:9094"
  kafka-target:
    image: "apache/kafka:3.8.0"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9099,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-target:9092,EXTERNAL://127.0.0.1:9099
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
    ports:
      - 9099:9099
```

{% endtab %}
{% endtabs %}

To ensure a clean start, execute this command to reset any prior configurations from earlier tutorials.

```bash
docker compose down
```

{% stepper %}
{% step %}

### Start the Kafka clusters

```bash
docker compose up -d registry-target kafka-target registry-source kafka-source
```

{% endstep %}

{% step %}

### Creating the source cluster topic

Execute these commands to create the "user-topic" topic and add data:

<pre class="language-bash"><code class="lang-bash">#create a topic
<strong>docker compose exec kafka-source \
</strong>      ./opt/kafka/bin/kafka-topics.sh \
      --create \
      --topic user-topic \
      --partitions 5 \
      --bootstrap-server 127.0.0.1:9092    

#add some data and register the schema
docker compose exec -it registry-source \
  kafka-avro-console-producer \
      --bootstrap-server kafka-source:9092 \
      --topic user-topic \
      --property schema.registry.url="http://registry-source:8085" \
      --property key.schema='{"type":"record","name":"userKey","fields":[{"name":"id","type":"int"}]}' \
      --property value.schema='{"type":"record","name":"userRecord","fields":[{"name":"user_name","type":"string"}]}'
#paste this message
{"user_name": "my-name"}
</code></pre>

{% endstep %}

{% step %}

### Run it

Use the following command to run the K2K replicator app:

<pre class="language-bash"><code class="lang-bash"><strong>#start k2k
</strong>docker compose up k2k
</code></pre>

{% hint style="danger" %}
Currently the script above won't write the specified message to the topic.\
We will address this issue soon.
{% endhint %}
{% endstep %}

{% step %}

### Validating results

K2K now replicates both data and schemas. Querying the subjects endpoint on the target schema registry will display identical schemas as those on the source.

```bash
#source
curl localhost:8085/subjects 
#target
curl localhost:8086/subjects 
```

{% endstep %}

{% step %}

### Repeat with different routing strategies

When replicating schemas, K2K considers the record routing strategy. Thus, if you repeat the steps with different routing strategies, such as appending a suffix to the topic name, the replicated subjects will be named according to the routing rule.

For more information about record routing refer to: ([routing-records](https://docs.lenses.io/latest/k2k/tutorial/routing-records "mention"))
{% endstep %}
{% endstepper %}
