Reference

The complete list of configuration properties for K2K.

Field
Behavior/Description
Required
Default
Type
Comment

name

Name of the pipeline.

Yes

string

features.exactlyOnce

Enables exactly-once processing.

No

disabled

enabled, disabled

keepRecordCreationTimestamp

Keeps the original message timestamp. If disabled, a new timestamp is set based on the target topic og.message.timestamp.type setting

No

enabled

enabled, disabled

features.headerReplication

Enables/disables header replication.

No

disabled

enabled, disabled

features.schemaMapping

Enables/disables schema replication.

No

disabled

enabled, disabled

features.offsetCommitOptimizePartition

Optimizes offset commit messages by publishing all control messages to the same partition.

No

enabled

enabled, disabled

features.autoCreateControlTopics

Enables creation of the necessary control topics used by the k2k application.

No

disabled

enabled, disabled

features.validateControlTopicSettings

Enables validation of configurations for control topics used by the k2k application.

No

enabled

enabled, disabled

features.tracingHeaders

Appends tracing headers to each replicated record. An object can be specified to configure which headers to provide.

No

disabled

enabled, disabled, object

features.tracingHeaders.partition

Appends the partition tracing header to each replicated record.

No

disabled

enabled, disabled

features.tracingHeaders.offset

Appends the offset tracing header to each replicated record.

No

disabled

enabled, disabled

features.tracingHeaders.topic

Appends the topic tracing header to each replicated record.

No

disabled

enabled, disabled

features.tracingHeaders.pipeline

Appends the pipeline name tracing header to each replicated record.

No

disabled

enabled, disabled

tracing.headers.partition

Name given to the header containing the source partition information.

No

__k2k_partition

string

tracing.headers.offset

Name given to the header containing the source offset information.

No

__k2k_partition

string

tracing.headers.topic

Name given to the header containing the source topic information.

No

__k2k_partition

string

tracing.headers.pipeline

Name given to the header containing the source pipeline information.

No

__k2k_partition

string

errorHandling.onCommitSyncTimeout

Handles the timeout when determining the latest committed offset.

No

fail, ignore

errorHandling.onControlMessageDeserializationError

Handles deserialization errors for control messages.

No

fail, ignore

coordination.kafka.charset

Charset used when serializing/deserializing coordination control messages.

No

UTF-8

Any valid charset

coordination.kafka.consumer

Kafka consumer configuration

No

empty object

object

accepts any kafka consumer setting

coordination.kafka.assignment.topic

Topic used for assignment coordination.

No

__k2k-assignment

only used if exactly-once is enabled

coordination.kafka.assignment.graceWindow

Time to wait before the application actively fences off other applications.

No

15 seconds

Time string (e.g., 15 seconds)

coordination.kafka.assignment.fencingMaxParallelism

Controls the max parallelism when fencing slow/zombie producers.

No

10

integer

coordination.kafka.commit.topic

Topic for offset commits.

No

__k2k_consumer-offsets

coordination.kafka.commit.syncTimeout

Time limit to wait for catchup when reading from the offset control topic.

No

10 seconds

Time string (e.g., 10 seconds)

coordination.kafka.commit.batchSize

Number of records after which an offset is committed (when exactly-once is disabled). Batch size when exactly-once is enabled.

No

100

Positive integers

coordination.kafka.commit.batchTimeout

Time to wait before injecting a commit record (when exactly-once is disabled). Time to wait before committing if the batch size is not reached (when exactly-once is enabled).

No

5 seconds

Time string (e.g., 5 seconds)

coordination.kafka.commit.group

Group name used to coordinate consumer commits and exactly-once.

Yes

string

This property is independent of source.kafka.consumer."group.id"

source.kafka.consumer

Kafka consumer configuration. Used when reading data from the source cluster.

No

source.kafka.consumer."group.id"

Consumer group name. Used exclusively for partition assignment.

Yes

This property is independent of coordination.kafka.commit.group

source.kafka.registry.url

Schema Registry URL.

No

Comma-separated list of addresses.

only used if features.schemaMapping is enabled

source.kafka.registry.config

Schema Registry connection options. (Full list of options in the corresponding section bellow).

No

empty object

object with string values.

only used if features.schemaMapping is enabled

source.kafka.registry.cacheSize

Schema Registry client internal client options. In most scenarios larger cache sizes won't result in improved performance.

No

31

Integer

only used if features.schemaMapping is enabled

source.kafka.registry.headers

Headers specified here will be added to all schema registry HTTP requests.

No

empty object

object with primitive values

only used if features.schemaMapping is enabled

source.kafka.registry.supportedTypes

Schema Registry supported types. Types that are not supported by default can be added here.

No

[AVRO, JSON, PROTOBUF,YANG]

Array of string

only used if features.schemaMapping is enabled

source.kafka.connection.servers

Kafka bootstrap servers for source connection.

Yes

Comma-separated list of server addresses

E.g., 127.0.0.1

target.kafka.producer

Kafka producer configuration used to produce data to the target cluster.

No

target.kafka.registry.url

Schema Registry URL.

No

Comma-separated list of addresses.

only used if features.schemaMapping is enabled

target.kafka.registry.config

Schema Registry connection options. (Full list of options in the corresponding section bellow).

No

empty object

object with string values.

only used if features.schemaMapping is enabled

target.kafka.registry.cacheSize

Schema Registry client internal client options. In most scenarios larger cache sizes won't result in improved performance.

No

31

Integer

only used if features.schemaMapping is enabled

target.kafka.registry.headers

Headers specified here will be added to all schema registry HTTP requests.

No

empty object

object with primitive values

only used if features.schemaMapping is enabled

target.kafka.registry.supportedTypes

Schema Registry supported types. Types that are not supported by default can be added here.

No

[AVRO, JSON, PROTOBUF,YANG]

Array of string

only used if features.schemaMapping is enabled

target.kafka.connection.servers

Kafka bootstrap servers for target connection

Yes

Comma-separated list of server addresses

E.g., 127.0.0.1,127.0.0.2

replication[0].source.name

Name of the source node.

Yes

replication[0].source.topic

Defines the topic(s) K2K will read data from.

Yes

Section Replication.Source.Topic

replication[].sink.name

Name of the sink node.

Yes

replication[].sink.topic

Strategy used to assign the target cluster's topic.

Yes

Any valid topic name

replication[].sink.partition

Strategy used to assign the target cluster's partition for a record.

Yes

Any valid partition

metrics.prefix.targetConsumer

Prefix added to metrics concerning the consumer instantiated against the target cluster.

No

"k2k.consumer.control."

Any valid string

metrics.prefix.targetProducer

Prefix added to metrics concerning the producer instantiated against the target cluster.

No

"k2k.producer."

Any valid string

metrics.prefix.sourceConsumer

Prefix added to metrics concerning the consumer instantiated against the source cluster.

No

"k2k.producer."

Any valid string

metrics.prefix.assignment

Prefix added to metrics concerning the assignment process (assigned topics, partitions,...)

No

"k2k."

Any valid string

metrics.kafka.topic

Topic where metrics are published to. Note: OTEL_METRICS_EXPORTER must be configured with kafka for metrics to be exporter to kafka.

No

"k2k."

Any valid string

topicCreation.control.common.partitions

Default partition count to be used when creating any of the control topics.

No

empty

integer

only used if features.autoCreateControlTopics is enabled

topicCreation.control.common.replication

Default replication to be used when creating any of the control topics.

No

empty

short

only used if features.autoCreateControlTopics is enabled

topicCreation.control.common.config

Default topic configuration to be used when creating any of the control topics.

No

empty object

object

only used if features.autoCreateControlTopics is enabled

topicCreation.control.assignment.partitions

Default partition count to be used when creating the topic used for exactly-once assignment control.

No

empty

integer

only used if features.autoCreateControlTopics is enabled

topicCreation.control.assignment.replication

Default replication to be used when creating the topic used for exactly-once assignment control.

No

empty

short

only used if features.autoCreateControlTopics is enabled

topicCreation.control.assignment.config

Default topic configuration to be used when creating the topic used for exactly-once assignment control.

No

object with {"cleanup.policy": "compact, delete"}

object

only used if features.autoCreateControlTopics is enabled

topicCreation.control.commit.partitions

Default partition count to be used when creating the topic used for offset commit control.

No

empty

integer

only used if features.autoCreateControlTopics is enabled

topicCreation.control.commit.replication

Default replication to be used when creating the topic used for offset commit control.

No

empty

short

only used if features.autoCreateControlTopics is enabled

topicCreation.control.commit.config

Default topic configuration to be used when creating the topic used for offset commit control.

No

object with {"cleanup.policy": "compact, delete"}

object

only used if features.autoCreateControlTopics is enabled

topicCreation.control.metrics.partitions

Default partition count to be used when creating the topic used for metrics.

No

empty

integer

only used if features.autoCreateControlTopics is enabled

topicCreation.control.metrics.replication

Default replication to be used when creating the topic used for metrics.

No

empty

short

only used if features.autoCreateControlTopics is enabled

topicCreation.control.metrics.config

Default topic configuration to be used when creating the topic used for metrics.

No

empty object

object

only used if features.autoCreateControlTopics is enabled

topicCreation.replication.common.partitions

Default partition count to be used when auto-creating any of the replicated topics.

No

empty

integer

only used if features.autoCreateTopics is enabled

topicCreation.replication.common.replication

Default replication to be used when auto-creating any of the replicated control topics.

No

empty

short

only used if features.autoCreateTopics is enabled

topicCreation.replication.common.config

Default topic configuration to be used when auto-creating any of the replicated control topics.

No

empty object

object

only used if features.autoCreateTopics is enabled

topicCreation.replication.rules[].pattern

Regex used to select the topics the properties should be applied to. The first match in the containing array wins.

No

regex expression

only used if features.autoCreateTopics is enabled

topicCreation.replication.rules[].properties.partitions

Partition count to be used when auto-creating replicated topics with a name that matches the specified pattern.

No

integer

only used if features.autoCreateTopics is enabled

topicCreation.replication.rules[].properties.replication

Replication to be used when auto-creating replicated topics with a name that matches the specified pattern.

No

short

only used if features.autoCreateTopics is enabled

topicCreation.replication.rules[].properties.config

Topic configuration to be used when auto-creating replicated topics with a name that matches the specified pattern

No

object

only used if features.autoCreateTopics is enabled

license.acceptEula

It needs to be set to true to run the K2K application

Yes

boolean

license.token

A free License token obtained by sending an email to [email protected]

Yes

string

Topic Creation

  • If any of the properties topicCreation.*.replication and topicCreation.*.partitions is left blank (or set to null), K2K will default to the cluster default values for those properties.

  • For control topics, the final topic configuration will be the result of merging the default/common value with the one specified in topicCreation.control.common.config and the one for the specific topic e.g: topicCreation.control.commit.config.

  • For replicated topics, the final topic configuration will be the result of overriding the original topic's configuration with the result of merging the default/common value specified in topicCreation.replication.common with the one specified in the first matching rule in topicCreation.replication.rules[] .

Registry config

The full list of options and their documentation can be found here.

Note however that all options should be prefixed with "schema.registry" like in the table below:

Field Name
Behavior

schema.registry.max.retries

Maximum number of client retries.

schema.registry.retries.wait.ms

Initial delay before first retry.

schema.registry.retries.max.wait.ms

Maximum delay between retries.

schema.registry.schema.registry.url.randomize

Randomize starting URL index for load balancing.

schema.registry.basic.auth.credentials.source

How to source Basic auth credentials (URL, USER_INFO, SASL_INHERIT).

schema.registry.basic.auth.user.info

User credentials for Basic auth in user:password.

schema.registry.http.connect.timeout.ms

HTTP connection timeout.

schema.registry.http.read.timeout.ms

HTTP read timeout.

schema.registry.bearer.auth.token

Static bearer token for authentication.

schema.registry.proxy.host

Hostname of HTTP proxy.

schema.registry.proxy.port

Port of HTTP proxy.

schema.registry.bearer.auth.credentials.source

Method to source bearer token (e.g., OAUTHBEARER).

schema.registry.bearer.auth.issuer.endpoint.url

OAuth/OIDC issuer token endpoint URL.

schema.registry.bearer.auth.client.id

OAuth client ID for client credentials grant.

schema.registry.bearer.auth.client.secret

OAuth client secret.

schema.registry.bearer.auth.scope

Requested OAuth token scope.

schema.registry.bearer.auth.scope.claim.name

JWT claim name for scope (default “scope”).

schema.registry.bearer.auth.sub.claim.name

JWT claim name for subject (default “sub”).

schema.registry.bearer.auth.logical.cluster

Logical cluster identifier for token header.

schema.registry.bearer.auth.identity.pool.id

Identity pool ID for token header.

schema.registry.bearer.auth.cache.expiry.buffer.seconds

Token cache expiry buffer before actual expiry.

schema.registry.bearer.auth.custom.provider.class

Class to implement custom token retrieval.

schema.registry.ssl.protocol

SSLProtocol (e.g. TLSv1.2, TLSv1.3).

schema.registry.ssl.provider

Java security provider name.

schema.registry.ssl.cipher.suites

Enabled SSL cipher suites.

schema.registry.ssl.enabled.protocols

Enabled SSL/TLS protocol versions list.

schema.registry.ssl.keystore.type

Keystore format (JKS, PKCS12).

schema.registry.ssl.keystore.key

Keystore private key.

schema.registry.ssl.keystore.certificate.chain

Certificate chain for keystore.

schema.registry.ssl.truststore.certificates

Trusted CA certificates.

schema.registry.ssl.keystore.location

Path to keystore file.

schema.registry.ssl.keystore.password

Password to access keystore.

schema.registry.ssl.key.password

Password for key inside keystore.

schema.registry.ssl.truststore.type

Truststore format.

schema.registry.ssl.truststore.location

Path to truststore file.

schema.registry.ssl.truststore.password

Password to access truststore.

schema.registry.ssl.keymanager.algorithm

KeyManagerFactory algorithm (e.g. SunX509).

schema.registry.ssl.trustmanager.algorithm

TrustManagerFactory algorithm (e.g. PKIX).

schema.registry.ssl.endpoint.identification.algorithm

Hostname verification algorithm (e.g. HTTPS).

schema.registry.ssl.secure.random.implementation

SecureRandom implementation class.

schema.registry.ssl.engine.factory.class

Custom SSLEngineFactory implementation.

Registry Supported Types

If your schema registry uses a format/type not listed as a default, you can add them here. That should be enough for K2K to be able to replicate your schemas.

Last updated

Was this helpful?