Reference
The complete list of configuration properties for K2K.
name
Name of the pipeline.
Yes
string
features.exactlyOnce
Enables exactly-once processing.
No
disabled
enabled
, disabled
keepRecordCreationTimestamp
Keeps the original message timestamp. If disabled, a new timestamp is set based on the target topic og.message.timestamp.type
setting
No
enabled
enabled
, disabled
features.headerReplication
Enables/disables header replication.
No
disabled
enabled
, disabled
features.schemaMapping
Enables/disables schema replication.
No
disabled
enabled
, disabled
features.offsetCommitOptimizePartition
Optimizes offset commit messages by publishing all control messages to the same partition.
No
enabled
enabled
, disabled
features.autoCreateControlTopics
Enables creation of the necessary control topics used by the k2k application.
No
disabled
enabled
, disabled
features.validateControlTopicSettings
Enables validation of configurations for control topics used by the k2k application.
No
enabled
enabled
, disabled
features.tracingHeaders
Appends tracing headers to each replicated record. An object can be specified to configure which headers to provide.
No
disabled
enabled
, disabled
, object
features.tracingHeaders.partition
Appends the partition tracing header to each replicated record.
No
disabled
enabled
, disabled
features.tracingHeaders.offset
Appends the offset tracing header to each replicated record.
No
disabled
enabled
, disabled
features.tracingHeaders.topic
Appends the topic tracing header to each replicated record.
No
disabled
enabled
, disabled
features.tracingHeaders.pipeline
Appends the pipeline name tracing header to each replicated record.
No
disabled
enabled
, disabled
tracing.headers.partition
Name given to the header containing the source partition information.
No
__k2k_partition
string
tracing.headers.offset
Name given to the header containing the source offset information.
No
__k2k_partition
string
tracing.headers.topic
Name given to the header containing the source topic information.
No
__k2k_partition
string
tracing.headers.pipeline
Name given to the header containing the source pipeline information.
No
__k2k_partition
string
errorHandling.onCommitSyncTimeout
Handles the timeout when determining the latest committed offset.
No
fail
, ignore
errorHandling.onControlMessageDeserializationError
Handles deserialization errors for control messages.
No
fail
, ignore
coordination.kafka.charset
Charset used when serializing/deserializing coordination control messages.
No
UTF-8
Any valid charset
coordination.kafka.consumer
Kafka consumer configuration
No
empty object
object
accepts any kafka consumer setting
coordination.kafka.assignment.topic
Topic used for assignment coordination.
No
__k2k-assignment
only used if exactly-once
is enabled
coordination.kafka.assignment.graceWindow
Time to wait before the application actively fences off other applications.
No
15 seconds
Time string (e.g., 15 seconds
)
coordination.kafka.assignment.fencingMaxParallelism
Controls the max parallelism when fencing slow/zombie producers.
No
10
integer
coordination.kafka.commit.topic
Topic for offset commits.
No
__k2k_consumer-offsets
coordination.kafka.commit.syncTimeout
Time limit to wait for catchup when reading from the offset control topic.
No
10 seconds
Time string (e.g., 10 seconds
)
coordination.kafka.commit.batchSize
Number of records after which an offset is committed (when exactly-once
is disabled). Batch size when exactly-once
is enabled.
No
100
Positive integers
coordination.kafka.commit.batchTimeout
Time to wait before injecting a commit record (when exactly-once
is disabled). Time to wait before committing if the batch size is not reached (when exactly-once
is enabled).
No
5 seconds
Time string (e.g., 5 seconds
)
coordination.kafka.commit.group
Group name used to coordinate consumer commits and exactly-once
.
Yes
string
This property is independent of source.kafka.consumer."group.id"
source.kafka.consumer
Kafka consumer configuration. Used when reading data from the source cluster.
No
source.kafka.consumer."group.id"
Consumer group name. Used exclusively for partition assignment.
Yes
This property is independent of coordination.kafka.commit.group
source.kafka.registry.url
Schema Registry URL.
No
Comma-separated list of addresses.
only used if features.schemaMapping
is enabled
source.kafka.registry.config
Schema Registry connection options. (Full list of options in the corresponding section bellow).
No
empty object
object with string values.
only used if features.schemaMapping
is enabled
source.kafka.registry.cacheSize
Schema Registry client internal client options. In most scenarios larger cache sizes won't result in improved performance.
No
31
Integer
only used if features.schemaMapping
is enabled
source.kafka.registry.headers
Headers specified here will be added to all schema registry HTTP requests.
No
empty object
object with primitive values
only used if features.schemaMapping
is enabled
source.kafka.registry.supportedTypes
Schema Registry supported types. Types that are not supported by default can be added here.
No
[AVRO, JSON, PROTOBUF,YANG]
Array of string
only used if features.schemaMapping
is enabled
source.kafka.connection.servers
Kafka bootstrap servers for source connection.
Yes
Comma-separated list of server addresses
E.g., 127.0.0.1
target.kafka.producer
Kafka producer configuration used to produce data to the target cluster.
No
target.kafka.registry.url
Schema Registry URL.
No
Comma-separated list of addresses.
only used if features.schemaMapping
is enabled
target.kafka.registry.config
Schema Registry connection options. (Full list of options in the corresponding section bellow).
No
empty object
object with string values.
only used if features.schemaMapping
is enabled
target.kafka.registry.cacheSize
Schema Registry client internal client options. In most scenarios larger cache sizes won't result in improved performance.
No
31
Integer
only used if features.schemaMapping
is enabled
target.kafka.registry.headers
Headers specified here will be added to all schema registry HTTP requests.
No
empty object
object with primitive values
only used if features.schemaMapping
is enabled
target.kafka.registry.supportedTypes
Schema Registry supported types. Types that are not supported by default can be added here.
No
[AVRO, JSON, PROTOBUF,YANG]
Array of string
only used if features.schemaMapping
is enabled
target.kafka.connection.servers
Kafka bootstrap servers for target connection
Yes
Comma-separated list of server addresses
E.g., 127.0.0.1,127.0.0.2
replication[0].source.name
Name of the source node.
Yes
replication[0].source.topic
Defines the topic(s) K2K will read data from.
Yes
Section Replication.Source.Topic
replication[].sink.name
Name of the sink node.
Yes
replication[].sink.topic
Strategy used to assign the target cluster's topic.
Yes
Any valid topic name
replication[].sink.partition
Strategy used to assign the target cluster's partition for a record.
Yes
Any valid partition
metrics.prefix.targetConsumer
Prefix added to metrics concerning the consumer instantiated against the target cluster.
No
"k2k.consumer.control."
Any valid string
metrics.prefix.targetProducer
Prefix added to metrics concerning the producer instantiated against the target cluster.
No
"k2k.producer."
Any valid string
metrics.prefix.sourceConsumer
Prefix added to metrics concerning the consumer instantiated against the source cluster.
No
"k2k.producer."
Any valid string
metrics.prefix.assignment
Prefix added to metrics concerning the assignment process (assigned topics, partitions,...)
No
"k2k."
Any valid string
metrics.kafka.topic
Topic where metrics are published to. Note: OTEL_METRICS_EXPORTER must be configured with kafka
for metrics to be exporter to kafka.
No
"k2k."
Any valid string
topicCreation.control.common.partitions
Default partition count to be used when creating any of the control topics.
No
empty
integer
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.common.replication
Default replication to be used when creating any of the control topics.
No
empty
short
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.common.config
Default topic configuration to be used when creating any of the control topics.
No
empty object
object
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.assignment.partitions
Default partition count to be used when creating the topic used for exactly-once assignment control.
No
empty
integer
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.assignment.replication
Default replication to be used when creating the topic used for exactly-once assignment control.
No
empty
short
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.assignment.config
Default topic configuration to be used when creating the topic used for exactly-once assignment control.
No
object with {"cleanup.policy": "compact, delete"}
object
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.commit.partitions
Default partition count to be used when creating the topic used for offset commit control.
No
empty
integer
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.commit.replication
Default replication to be used when creating the topic used for offset commit control.
No
empty
short
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.commit.config
Default topic configuration to be used when creating the topic used for offset commit control.
No
object with {"cleanup.policy": "compact, delete"}
object
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.metrics.partitions
Default partition count to be used when creating the topic used for metrics.
No
empty
integer
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.metrics.replication
Default replication to be used when creating the topic used for metrics.
No
empty
short
only used if features.autoCreateControlTopics
is enabled
topicCreation.control.metrics.config
Default topic configuration to be used when creating the topic used for metrics.
No
empty object
object
only used if features.autoCreateControlTopics
is enabled
topicCreation.replication.common.partitions
Default partition count to be used when auto-creating any of the replicated topics.
No
empty
integer
only used if features.autoCreateTopics
is enabled
topicCreation.replication.common.replication
Default replication to be used when auto-creating any of the replicated control topics.
No
empty
short
only used if features.autoCreateTopics
is enabled
topicCreation.replication.common.config
Default topic configuration to be used when auto-creating any of the replicated control topics.
No
empty object
object
only used if features.autoCreateTopics
is enabled
topicCreation.replication.rules[].pattern
Regex used to select the topics the properties should be applied to. The first match in the containing array wins.
No
regex expression
only used if features.autoCreateTopics
is enabled
topicCreation.replication.rules[].properties.partitions
Partition count to be used when auto-creating replicated topics with a name that matches the specified pattern.
No
integer
only used if features.autoCreateTopics
is enabled
topicCreation.replication.rules[].properties.replication
Replication to be used when auto-creating replicated topics with a name that matches the specified pattern.
No
short
only used if features.autoCreateTopics
is enabled
topicCreation.replication.rules[].properties.config
Topic configuration to be used when auto-creating replicated topics with a name that matches the specified pattern
No
object
only used if features.autoCreateTopics
is enabled
license.acceptEula
It needs to be set to true to run the K2K application
Yes
boolean
Topic Creation
If any of the properties
topicCreation.*.replication
andtopicCreation.*.partitions
is left blank (or set to null), K2K will default to the cluster default values for those properties.For control topics, the final topic configuration will be the result of merging the default/common value with the one specified in
topicCreation.control.common.config
and the one for the specific topic e.g:topicCreation.control.commit.config
.For replicated topics, the final topic configuration will be the result of overriding the original topic's configuration with the result of merging the default/common value specified in
topicCreation.replication.common
with the one specified in the first matching rule intopicCreation.replication.rules[]
.
Registry config
The full list of options and their documentation can be found here.
Note however that all options should be prefixed with "schema.registry" like in the table below:
schema.registry.max.retries
Maximum number of client retries.
schema.registry.retries.wait.ms
Initial delay before first retry.
schema.registry.retries.max.wait.ms
Maximum delay between retries.
schema.registry.schema.registry.url.randomize
Randomize starting URL index for load balancing.
schema.registry.basic.auth.credentials.source
How to source Basic auth credentials (URL, USER_INFO, SASL_INHERIT).
schema.registry.basic.auth.user.info
User credentials for Basic auth in user:password
.
schema.registry.http.connect.timeout.ms
HTTP connection timeout.
schema.registry.http.read.timeout.ms
HTTP read timeout.
schema.registry.bearer.auth.token
Static bearer token for authentication.
schema.registry.proxy.host
Hostname of HTTP proxy.
schema.registry.proxy.port
Port of HTTP proxy.
schema.registry.bearer.auth.credentials.source
Method to source bearer token (e.g., OAUTHBEARER).
schema.registry.bearer.auth.issuer.endpoint.url
OAuth/OIDC issuer token endpoint URL.
schema.registry.bearer.auth.client.id
OAuth client ID for client credentials grant.
schema.registry.bearer.auth.client.secret
OAuth client secret.
schema.registry.bearer.auth.scope
Requested OAuth token scope.
schema.registry.bearer.auth.scope.claim.name
JWT claim name for scope (default “scope”).
schema.registry.bearer.auth.sub.claim.name
JWT claim name for subject (default “sub”).
schema.registry.bearer.auth.logical.cluster
Logical cluster identifier for token header.
schema.registry.bearer.auth.identity.pool.id
Identity pool ID for token header.
schema.registry.bearer.auth.cache.expiry.buffer.seconds
Token cache expiry buffer before actual expiry.
schema.registry.bearer.auth.custom.provider.class
Class to implement custom token retrieval.
schema.registry.ssl.protocol
SSLProtocol (e.g. TLSv1.2, TLSv1.3).
schema.registry.ssl.provider
Java security provider name.
schema.registry.ssl.cipher.suites
Enabled SSL cipher suites.
schema.registry.ssl.enabled.protocols
Enabled SSL/TLS protocol versions list.
schema.registry.ssl.keystore.type
Keystore format (JKS, PKCS12).
schema.registry.ssl.keystore.key
Keystore private key.
schema.registry.ssl.keystore.certificate.chain
Certificate chain for keystore.
schema.registry.ssl.truststore.certificates
Trusted CA certificates.
schema.registry.ssl.keystore.location
Path to keystore file.
schema.registry.ssl.keystore.password
Password to access keystore.
schema.registry.ssl.key.password
Password for key inside keystore.
schema.registry.ssl.truststore.type
Truststore format.
schema.registry.ssl.truststore.location
Path to truststore file.
schema.registry.ssl.truststore.password
Password to access truststore.
schema.registry.ssl.keymanager.algorithm
KeyManagerFactory algorithm (e.g. SunX509).
schema.registry.ssl.trustmanager.algorithm
TrustManagerFactory algorithm (e.g. PKIX).
schema.registry.ssl.endpoint.identification.algorithm
Hostname verification algorithm (e.g. HTTPS).
schema.registry.ssl.secure.random.implementation
SecureRandom implementation class.
schema.registry.ssl.engine.factory.class
Custom SSLEngineFactory implementation.
Registry Supported Types
If your schema registry uses a format/type not listed as a default, you can add them here. That should be enough for K2K to be able to replicate your schemas.
Last updated
Was this helpful?