Reference

The complete list of configuration properties for K2K.

Kafka SASL & SSL

To secure your K2K, you can pass in standard consumer and producer properties in the configuration file and set the broker endpoints accordingly. You must set them in three places:

  1. Coordinatoor consumer

  2. Source consumer

  3. Target producer

pipeline.yaml with consumer and producer props for SASL and SSL
pipeline.yaml
coordination:
  kafka:
    charset: "UTF-8"
    consumer: 
      "group.id": "demo-k2k-coordination"  
      "client.id": "test-coordination"   
      "security.protocol": "SASL_PLAINTEXT"
      "sasl.mechanism": "PLAIN"
      "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
    assignment:
      topic: "__k2k-assignment"
      graceWindow: "15 seconds"
      fencingMaxParallelism: 10
    commit:
      topic: "__k2k_consumer-offsets"
      syncTimeout: "10 seconds"
      batchSize: 100
      batchTimeout: "5 seconds"
      group: "group.test"
source:
  kafka:
    registry: "http://sourceSR:8085"
    consumer:
      "group.id": "demo-k2k"
      "security.protocol": "SASL_PLAINTEXT"
      "sasl.mechanism": "PLAIN"
      "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
target:
  kafka:
    registry: "http://targetSR:8085"   
    producer:
      "security.protocol": "SASL_PLAINTEXT"
      "sasl.mechanism": "PLAIN"
      "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
    connection:
      servers: "targetKafka:9099"

Schema Registry

You can set the serializers and deserializers for the target.kafka.producers , coordination.kafka.consumer and source.kafka.consumer by prefixing the configuration with schema.registry as you would for any other Kafka client.

You must also set the url, usernameand password in the registry section for the source and target.

pipelines.yaml with schema registry serializers & deserializers.
coordination:
  kafka:
    charset: "UTF-8"
    consumer: 
     ...
      "schema.registry.url": "http://sourceSR:8085"
      "schema.registry.basic.auth.credentials.source": "USER_INFO"
      "schema.registry.basic.auth.user.info": "sr-user:sr-password"

source:
  kafka:
    registry: 
      url: "http://sourceSR:8085"
      username: "sr-user"
      password: "sr-password"
    consumer:
    ....
      # Schema Registry Settings
      "schema.registry.url": "http://sourceSR:8085"
      "schema.registry.basic.auth.user.info": "sr-user:sr-password"
      "schema.registry.basic.auth.credentials.source": "USER_INFO"
  
    connection:
      servers: "sourceKafka:9092"
...
target:
  kafka:
    registry: 
      url: "http://targetSR:8085"
      username: "sr-user"
      password: "sr-password"
    consumer:
    ......
      # Schema Registry Settings
      "schema.registry.url": "http://targetSR:8085"
      "schema.registry.basic.auth.user.info": "sr-user:sr-password"
      "schema.registry.basic.auth.credentials.source": "USER_INFO"
  
    connection:
      servers: "targetKafka:9092"
      

Main Reference

Configuration property
What it means
Is required
Default value
Type
Comment

name

Name of the K2K app. The app is also refered to as a pipeline.

required

string

features.exactlyOnce

Enables exactly-once record delivery.

optional

disabled

enabled, disabled

features.headerReplication

Enables header replication.

optional

disabled

enabled, disabled

features.schemaMapping

Enables schema replication.

optional

disabled

enabled, disabled

features.offsetCommitOptimizePartition

Optimizes offset commit messages by publishing all control messages to the same partition.

optional

enabled

enabled, disabled

features.tracingHeaders

Appends tracing headers to each replicated record. You can also specify an object to configure which headers to provide.

optional

disabled

enabled, disabled, object`

features.tracingHeaders.partition

Appends the partition tracing header to each replicated record.

optional

disabled

enabled, disabled

features.tracingHeaders.offset

Appends the offset tracing header to each replicated record.

optional

disabled

enabled, disabled

features.tracingHeaders.topic

Appends the topic tracing header to each replicated record.

optional

disabled

enabled, disabled

features.tracingHeaders.pipeline

Appends the pipeline name (K2K app name) tracing header to each replicated record.

optional

disabled

enabled, disabled

tracing.headers.partition

Name given to the header containing the source partition information.

optional

__k2k_partition

string

tracing.headers.offset

Name given to the header containing the source offset information.

optional

__k2k_partition

string

tracing.headers.topic

Name given to the header containing the source topic information.

optional

__k2k_partition

string

tracing.headers.pipeline

Name given to the header containing the source pipeline information (K2K app name).

optional

__k2k_partition

string

errorHandling.onCommitSyncTimeout

Handles the timeout when determining the latest committed offset.

optional

fail, ignore

errorHandling.onControlMessageDeserializationError

Handles deserialization errors for coordination control messages.

optional

fail, ignore

coordination.kafka.charset

Charset used when serializing/deserializing coordination control messages.

optional

UTF-8

Any valid charset

coordination.kafka.consumer

Kafka consumer configuration for K2K internal coordination topic.

optional

empty object

object

accepts any kafka consumer setting

coordination.kafka.assignment.topic

K2K internal topic used for assignment coordination. The topic can be shared across K2K apps.

optional

__k2k-assignment

only used if exactly-once is enabled

coordination.kafka.assignment.graceWindow

Time to wait before a K2K instance actively fences off other potential K2K instances competing for partition assignment.

optional

15 seconds

Time string (e.g., 15 seconds)

coordination.kafka.assignment.fencingMaxParallelism

Controls the max parallelism when fencing slow/zombie producers.

optional

10

integer

coordination.kafka.commit.topic

K2K internal topic for offset commits. The topic can be shared across K2K apps (similar to __consumer_offsets Kafka system topic).

optional

__k2k_consumer-offsets

coordination.kafka.commit.syncTimeout

Time limit to wait for catchup when reading from the offset control topic.

optional

10 seconds

Time string (e.g., 10 seconds)

coordination.kafka.commit.batchSize

Number of records after which an offset is committed (when exactly-once is disabled). Batch size when exactly-once is enabled.

optional

100

Positive integers

coordination.kafka.commit.batchTimeout

Time to wait before injecting a commit record (when exactly-once is disabled). Time to wait before committing if the batch size is not reached (when exactly-once is enabled).

optional

5 seconds

Time string (e.g., 5 seconds)

coordination.kafka.commit.group

Consumer group name used to coordinate consumer commits and exactly-once. It's used to consume the internal coordination topic. K2K uses the coordination topic to manage what source and target partitions each runner handles

required

string

This property is independent of source.kafka.consumer."group.id"

source.kafka.consumer

Kafka consumer configuration. Used when reading data from the source cluster.

optional

source.kafka.consumer."group.id"

Consumer group name. Used exclusively for partition assignment.

required

This property is independent of coordination.kafka.commit.group

source.kafka.registry

Source Schema Registry URL.

optional

only used if features.schemaMapping is enabled

source.kafka.connection.servers

Kafka bootstrap servers for source Kafka cluster.

required

Comma-separated list of server addresses

E.g., 127.0.0.1

target.kafka.producer

Kafka producer configuration used to produce data to the target cluster.

optional

target.kafka.registry

Target Schema Registry URL.

optional

only used if features.schemaMapping is enabled

target.kafka.connection.servers

Kafka bootstrap servers for target Kafka cluster.

required

Comma-separated list of server addresses

E.g., 127.0.0.1,127.0.0.2

replication[0].source.name

Name of the source node.

required

replication[0].source.topic

Defines the topic(s) K2K will read data from.

required

Section Replication.Source.Topic

replication[].sink.name

Name of the sink node.

required

replication[].sink.topic

Strategy used to assign the target cluster's topic.

required

Any valid topic name

replication[].sink.partition

Strategy used to assign the target cluster's partition for a record.

required

Any valid partition

Last updated

Was this helpful?