Settings
The SET syntax allows customizing the behaviour for the underlying Kafka Consumer/Producer, Kafka Streams (including RocksDB parameters), topic creation and error handling.
The general syntax is:
Kafka topics
SQL processors can create topics that are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic-related) to allow distinct setups for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.
To set the defaults follow this syntax:
autocreate
BOOLEAN
Creates the topic if it does not exist already.
partitions
INT
Controls the target topic partitions count. If the topic already exists, this will not be applied.
replication
INT
Controls the topic replication factor. If the topic already exists, this will not be applied.
-
Each Kafka topics allows a set of parameters to be set. For example cleanup.policy
can be set like this SET defaults.topic.cleanup.policy='compact,delete';
key.avro.record
STRING
Controls the output record Key schema name.
key.avro.namespace
STRING
Controls the output record Key schema namespace.
value.avro.record
STRING
Controls the output record Key schema name.
value.avro.namespace
STRING
Controls the output record Key schema namespace.
All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:
Error handling
The streaming engine allows users to define how errors are handled when writing to or reading from a topic.
Both sides can be set at once by doing:
or individually as described in the sections below.
Reading Errors
Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:
Writing Errors
While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:
There are three possible values to control the behaviour.
continue
Allows the application to carry on. The problem will be logged.
fail
Stops the application. The application will be in a failed (error) state.
dlq
Allows the application to continue but it will send the payload to a dead-letter-topic. It requires dead.letter.queue
to be set. The default value for dead.letter.queue
is lenses.sql.dlq
.
When dlq
is used this setting is required. The value is the target topic where the problematic records will be sent to.
Kafka Streams Consumer and Producer settings
Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.
\
processing.guarantee
STRING
The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production.
commit.interval.ms
LONG
The frequency with which to save the position of the processor. If processing.guarantee
is set to EXACTLY_ONCE
, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behavior of Tables, as it controls how often they will emit events downstream. An event will be emitted only every commit.interval.ms
, so every intermediate event that is received by the table will not be visible downstream directly.
poll.ms
LONG
The amount of time in milliseconds to block waiting for input.
cache.max.bytes.buffering
LONG
Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024.
client.id
STRING
An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer
num.standby.replicas
INT
The number of standby replicas for each task. Default value is 0.
num.stream.threads
INT
The number of threads to execute stream processing. Default values is 1.
max.task.idle.ms
LONG
Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.
buffered.records.per.partition
INT
Maximum number of records to buffer per partition. Default is 1000.
buffered.records.per.partition
INT
Maximum number of records to buffer per partition. Default is 1000.
connections.max.idle.ms
LONG
Close idle connections after the number of milliseconds specified by this config.
receive.buffer.bytes
LONG
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
reconnect.backoff.ms
LONG
The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
reconnect.backoff.max.ms
LONG
The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000.
retries
INT
Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0
retry.backoff.ms
LONG
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100.
send.buffer.bytes
LONG
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024.
state.cleanup.delay.ms
LONG
The amount of time in milliseconds to wait before deleting state when a partition has migrated.
Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.
Some of the configurations for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.
RocksDB
Stateful data flow applications might require, on rare occasions, some of the parameters for the underlying RocksDB to be tweaked.
To set the properties, use:
rocksdb.table.block.cache.size
LONG
Set the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M
rocksdb.table.block.size
LONG
Approximate size of user data packed per lock. Default: 4K
rocksdb.table.block.cache.compressed.num.shard.bits
INT
Controls the number of shards for the block compressed cache
rocksdb.table.block.cache.num.shard.bits
INT
Controls the number of shards for the block cache
rocksdb.table.block.cache.compressed.size
LONG
Size of compressed block cache. If 0,then block_cache_compressed is set to null
rocksdb.table.block.restart.interval
INT
Set block restart interval
rocksdb.table.block.cache.size.and.filter
BOOL
Indicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization
rocksdb.table.block.checksum.type
STRING
Sets the checksum type to be used with this table. Available values: kNoChecksum
, kCRC32c
, kxxHash
.
rocksdb.table.block.hash.allow.collision
BOOL
Influence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption)
rocksdb.table.block.index.type
STRING
Sets the index type to used with this table. Available values: kBinarySearch
, kHashSearch
rocksdb.table.block.no.cache
BOOL
Disable block cache. If this is set to true, then no block cache should be used. Default: false
rocksdb.table.block.whole.key.filtering
BOOL
If true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true
rocksdb.table.block.pinl0.filter
BOOL
Indicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false.
rocksdb.total.threads
INT
The max threads RocksDB should use
rocksdb.write.buffer.size
LONG
Sets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file
rocksdb.table.block.size.deviation
INT
This is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10.
rocksdb.compaction.style
STRING
Available values: LEVEL
, UNIVERSAL
, FIFO
rocksdb.max.write.buffer
INT
rocksdb.base.background.compaction
INT
rocksdb.background.compaction.max
INT
rocksdb.subcompaction.max
INT
rocksdb.background.flushes.max
INT
rocksdb.log.file.max
LONG
rocksdb.log.fle.roll.time
LONG
rocksdb.compaction.auto
BOOL
rocksdb.compaction.level.max
INT
rocksdb.files.opened.max
INT
rocksdb.wal.ttl
LONG
rocksdb.wal.size.limit
LONG
rocksdb.memtable.concurrent.write
BOOL
rocksdb.os.buffer
BOOL
rocksdb.data.sync
BOOL
rocksdb.fsync
BOOL
rocksdb.log.dir
STRING
rocksdb.wal.dir
STRING
Last updated