Settings

The SET syntax allows customizing the behaviour for the underlying Kafka Consumer/Producer, Kafka Streams (including RocksDB parameters), topic creation and error handling.

The general syntax is:

SET <setting_name>=<setting_value>;

Kafka topics

SQL processors can create topics that are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic-related) to allow distinct setups for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.

To set the defaults follow this syntax:

SET defaults.topic.<topic_setting_key> = <value>;
KeyTypeDescription

autocreate

BOOLEAN

Creates the topic if it does not exist already.

partitions

INT

Controls the target topic partitions count. If the topic already exists, this will not be applied.

replication

INT

Controls the topic replication factor. If the topic already exists, this will not be applied.

-

Each Kafka topics allows a set of parameters to be set. For example cleanup.policy can be set like this SET defaults.topic.cleanup.policy='compact,delete';

key.avro.record

STRING

Controls the output record Key schema name.

key.avro.namespace

STRING

Controls the output record Key schema namespace.

value.avro.record

STRING

Controls the output record Key schema name.

value.avro.namespace

STRING

Controls the output record Key schema namespace.

All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:

SET topic.<topic_name>.<topic_setting_key>=<value>;
SET topic.market_risk.cleanup.policy='compact,delete';

--escaping the topic name if it contains . or - or other non-alpha numeric
SET topic.`market.risk`.cleanup.policy='compact,delete';
SET topic.`market-risk`.cleanup.policy='compact,delete';

Error handling

The streaming engine allows users to define how errors are handled when writing to or reading from a topic.

Both sides can be set at once by doing:

SET error.policy= '<error_policy>';

or individually as described in the sections below.

Reading Errors

Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:

SET error.policy.read= '<error_policy>';

Writing Errors

While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:

SET error.policy.write= '<error_policy>';

There are three possible values to control the behaviour.

ValueDescription

continue

Allows the application to carry on. The problem will be logged.

fail

Stops the application. The application will be in a failed (error) state.

dlq

Allows the application to continue but it will send the payload to a dead-letter-topic. It requires dead.letter.queue to be set. The default value for dead.letter.queue is lenses.sql.dlq.

When dlq is used this setting is required. The value is the target topic where the problematic records will be sent to.

SET dead.letter.queue = '<dead_letter_topic>';

Kafka Streams Consumer and Producer settings

Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.

SET <setting_key>=<value>;

\

KeyTypeDescription

processing.guarantee

STRING

The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production.

commit.interval.ms

LONG

The frequency with which to save the position of the processor. If processing.guarantee is set to EXACTLY_ONCE, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behavior of Tables, as it controls how often they will emit events downstream. An event will be emitted only every commit.interval.ms, so every intermediate event that is received by the table will not be visible downstream directly.

poll.ms

LONG

The amount of time in milliseconds to block waiting for input.

cache.max.bytes.buffering

LONG

Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024.

client.id

STRING

An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer

num.standby.replicas

INT

The number of standby replicas for each task. Default value is 0.

num.stream.threads

INT

The number of threads to execute stream processing. Default values is 1.

max.task.idle.ms

LONG

Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.

buffered.records.per.partition

INT

Maximum number of records to buffer per partition. Default is 1000.

buffered.records.per.partition

INT

Maximum number of records to buffer per partition. Default is 1000.

connections.max.idle.ms

LONG

Close idle connections after the number of milliseconds specified by this config.

receive.buffer.bytes

LONG

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.

reconnect.backoff.ms

LONG

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

reconnect.backoff.max.ms

LONG

The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000.

retries

INT

Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0

retry.backoff.ms

LONG

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100.

send.buffer.bytes

LONG

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024.

state.cleanup.delay.ms

LONG

The amount of time in milliseconds to wait before deleting state when a partition has migrated.

Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.

SET session.timeout.ms=120000;
SET max.poll.record = 20000;

Some of the configurations for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.

SET consumer.<duplicate_config_key>=<value_1>;
SET producer.<duplicate_config_key>=<value_2>;

RocksDB

Stateful data flow applications might require, on rare occasions, some of the parameters for the underlying RocksDB to be tweaked.

To set the properties, use:

SET rocksdb.<key> = <value>;
KeyTypeDescription

rocksdb.table.block.cache.size

LONG

Set the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M

rocksdb.table.block.size

LONG

Approximate size of user data packed per lock. Default: 4K

rocksdb.table.block.cache.compressed.num.shard.bits

INT

Controls the number of shards for the block compressed cache

rocksdb.table.block.cache.num.shard.bits

INT

Controls the number of shards for the block cache

rocksdb.table.block.cache.compressed.size

LONG

Size of compressed block cache. If 0,then block_cache_compressed is set to null

rocksdb.table.block.restart.interval

INT

Set block restart interval

rocksdb.table.block.cache.size.and.filter

BOOL

Indicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization

rocksdb.table.block.checksum.type

STRING

Sets the checksum type to be used with this table. Available values: kNoChecksum, kCRC32c, kxxHash.

rocksdb.table.block.hash.allow.collision

BOOL

Influence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption)

rocksdb.table.block.index.type

STRING

Sets the index type to used with this table. Available values: kBinarySearch, kHashSearch

rocksdb.table.block.no.cache

BOOL

Disable block cache. If this is set to true, then no block cache should be used. Default: false

rocksdb.table.block.whole.key.filtering

BOOL

If true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true

rocksdb.table.block.pinl0.filter

BOOL

Indicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false.

rocksdb.total.threads

INT

The max threads RocksDB should use

rocksdb.write.buffer.size

LONG

Sets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file

rocksdb.table.block.size.deviation

INT

This is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10.

rocksdb.compaction.style

STRING

Available values: LEVEL, UNIVERSAL, FIFO

rocksdb.max.write.buffer

INT

rocksdb.base.background.compaction

INT

rocksdb.background.compaction.max

INT

rocksdb.subcompaction.max

INT

rocksdb.background.flushes.max

INT

rocksdb.log.file.max

LONG

rocksdb.log.fle.roll.time

LONG

rocksdb.compaction.auto

BOOL

rocksdb.compaction.level.max

INT

rocksdb.files.opened.max

INT

rocksdb.wal.ttl

LONG

rocksdb.wal.size.limit

LONG

rocksdb.memtable.concurrent.write

BOOL

rocksdb.os.buffer

BOOL

rocksdb.data.sync

BOOL

rocksdb.fsync

BOOL

rocksdb.log.dir

STRING

rocksdb.wal.dir

STRING

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.