The SET syntax allows to customize the behavior for the underling Kafka Consumer/Producer, Kafka Streams (inlcuding RocksDB parameters), topic creation and error handling.

The general syntax is:

SET <setting_name>=<setting_value>;

Kafka topics 

The computed result is written back to a Kafka topic. SQL processor can create the topics are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic related) to allow distinct setup for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.

To set the defaults follow this syntax:

SET defaults.topic.<topic_setting_key> = <value>;

autocreateBOOLEANCreates the topic if it does not exist already.
partitionsINTControls the target topic partitions count. If the topic already exists, this will not be applied.
replicationINTControls the topic replication factor. If the topic already exists, this will not be applied.
-Each Kafka topics allows a set of parameters to be set. For example cleanup.policy
can be set like this SET defaults.topic.cleanup.policy='compact,delete';
key.avro.recordSTRINGControls the output record Key schema name.
key.avro.namespaceSTRINGControls the output record Key schema namespace.
value.avro.recordSTRINGControls the output record Key schema name.
value.avro.namespaceSTRINGControls the output record Key schema namespace.

All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:

SET topic.<topic_name>.<topic_setting_key>=<value>;

SET topic.market_risk.cleanup.policy='compact,delete';

--escaping the topic name if it contains . or - or other non-alpha numeric
SET topic.`market.risk`.cleanup.policy='compact,delete';
SET topic.`market-risk`.cleanup.policy='compact,delete';

Error handling 

The streaming engine allows users to define how errors are handled when writing to or reading from a topic.

Both sides can be set at once by doing:

SET error.policy= '<error_policy>';

or individually as described in the sections below.

Reading Errors 

Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:

SET '<error_policy>';

Writing Errors 

While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:

SET error.policy.write= '<error_policy>';

There are three possible values to control the behavior.

continueAllows the application to carry on. The problem will be logged.
failStops the application. The application will be in a failed (error) state.
dlqAllows the application to continue but it will send the payload to a dead-letter-topic. It requires dead.letter.queue to be set. The default value for dead.letter.queue is lenses.sql.dlq.

When dlq is used this setting is required. The value is the target topic where the problematic records will be sent to.

SET dead.letter.queue = '<dead_letter_topic>';

Kafka Streams Consumer and Producer settings 

Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.

SET <setting_key>=<value>;

processing.guaranteeSTRINGThe processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production.
commit.interval.msLONGThe frequency with which to save the position of the processor. If processing.guarantee is set to EXACTLY_ONCE, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behavior of Tables, as it controls how often they will emit events downstream. An event will be emitted only every, so every intermediate event that is received by the table will not be visible downstream directly.
poll.msLONGThe amount of time in milliseconds to block waiting for input.
cache.max.bytes.bufferingLONGMaximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024.
client.idSTRINGAn ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer
num.standby.replicasINTThe number of standby replicas for each task. Default value is 0. number of threads to execute stream processing. Default values is 1.
max.task.idle.msLONGMaximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.
buffered.records.per.partitionINTMaximum number of records to buffer per partition. Default is 1000.
buffered.records.per.partitionINTMaximum number of records to buffer per partition. Default is 1000.
connections.max.idle.msLONGClose idle connections after the number of milliseconds specified by this config.
receive.buffer.bytesLONGThe size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
reconnect.backoff.msLONGThe base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
reconnect.backoff.max.msLONGThe maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000.
retriesINTSetting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0
retry.backoff.msLONGThe amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100.
send.buffer.bytesLONGThe size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024.
state.cleanup.delay.msLONGThe amount of time in milliseconds to wait before deleting state when a partition has migrated.

Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.

SET max.poll.record = 20000;

Some of the configuration for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.

SET consumer.<duplicate_config_key>=<value_1>;
SET producer.<duplicate_config_key>=<value_2>;


Stateful data flow applications, might require, on rare occassion, some of the parameters for the underlying RocksDB to be tweaked.

To set the properties, use:

SET rocksdb.<key> = <value>;

rocksdb.table.block.cache.sizeLONGSet the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M
rocksdb.table.block.sizeLONGApproximate size of user data packed per lock. Default: 4K
rocksdb.table.block.cache.compressed.num.shard.bitsINTControls the number of shards for the block compressed cache
rocksdb.table.block.cache.num.shard.bitsINTControls the number of shards for the block cache
rocksdb.table.block.cache.compressed.sizeLONGSize of compressed block cache. If 0,then block_cache_compressed is set to null
rocksdb.table.block.restart.intervalINTSet block restart interval
rocksdb.table.block.cache.size.and.filterBOOLIndicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization
rocksdb.table.block.checksum.typeSTRINGSets the checksum type to be used with this table. Available values: kNoChecksum, kCRC32c, kxxHash.
rocksdb.table.block.hash.allow.collisionBOOLInfluence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption)
rocksdb.table.block.index.typeSTRINGSets the index type to used with this table. Available values: kBinarySearch, kHashSearch block cache. If this is set to true, then no block cache should be used. Default: false
rocksdb.table.block.whole.key.filteringBOOLIf true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true
rocksdb.table.block.pinl0.filterBOOLIndicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false. max threads RocksDB should use
rocksdb.write.buffer.sizeLONGSets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file
rocksdb.table.block.size.deviationINTThis is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10.
rocksdb.compaction.styleSTRINGAvailable values: LEVEL, UNIVERSAL, FIFO
Last modified: July 3, 2024