4.3
Settings
The SET syntax allows to customize the behaviour for the underling Kafka Consumer/Producer, Kafka Streams (inlcuding RocksDB parameters), topic creation and error handling.
The general syntax is:
SET <setting_name>=<setting_value>;
Kafka topics
The computed result is written back to a Kafka topic. SQL processor can create the topics are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic related) to allow distinct setup for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.
To set the defaults follow this syntax:
SET defaults.topic.<topic_setting_key> = <value>;
Key | Type | Description |
---|---|---|
autocreate | BOOLEAN | Creates the topic if it does not exist already. |
partitions | INT | Controls the target topic partitions count. If the topic already exists, this will not be applied. |
replication | INT | Controls the topic replication factor. If the topic already exists, this will not be applied. |
- | Each Kafka topics allows a set of parameters to be set. For example cleanup.policy can be set like this SET defaults.topic.cleanup.policy='compact,delete'; | |
key.avro.record | STRING | Controls the output record Key schema name. |
key.avro.namespace | STRING | Controls the output record Key schema namespace. |
value.avro.record | STRING | Controls the output record Key schema name. |
value.avro.namespace | STRING | Controls the output record Key schema namespace. |
All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:
SET topic.<topic_name>.<topic_setting_key>=<value>;
SET topic.market_risk.cleanup.policy='compact,delete';
--escaping the topic name if it contains . or - or other non-alpha numeric
SET topic.`market.risk`.cleanup.policy='compact,delete';
SET topic.`market-risk`.cleanup.policy='compact,delete';
Error handling
The streaming engine allows users to define how errors are handled when writing to or reading from a topic.
Both sides can be set at once by doing:
SET error.policy= '<error_policy>';
or individually as described in the sections below.
###Reading Errors Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:
SET error.policy.read= '<error_policy>';
###Writing Errors While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:
SET error.policy.write= '<error_policy>';
There are three possible values to control the behaviour.
Value | Description |
---|---|
continue | Allows the application to carry on. The problem will be logged. |
fail | Stops the application. The application will be in a failed (error) state. |
dlq | Allows the application to continue but it will send the payload to a dead-letter-topic. It requires dead.letter.queue to be set. The default value for dead.letter.queue is lenses.sql.dlq . |
When dlq
is used this setting is required. The value is the target topic where the problematic records will be sent to.
SET dead.letter.queue = '<dead_letter_topic>';
Kafka Streams Consumer and Producer settings
Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.
SET <setting_key>=<value>;
Key | Type | Description |
---|---|---|
processing.guarantee | STRING | The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production. |
commit.interval.ms | LONG | The frequency with which to save the position of the processor. If processing.guarantee is set to EXACTLY_ONCE , the default value is 100, otherwise the default value is 30000. This setting directly impacts the behaviour of
Tables
, as it controls how often they will emit events downstream. An event will be emitted only every commit.interval.ms , so every intermediate event that is received by the table will not be visible downstream directly. |
poll.ms | LONG | The amount of time in milliseconds to block waiting for input. |
cache.max.bytes.buffering | LONG | Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024. |
client.id | STRING | An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread- |
num.standby.replicas | INT | The number of standby replicas for each task. Default value is 0. |
num.stream.threads | INT | The number of threads to execute stream processing. Default values is 1. |
max.task.idle.ms | LONG | Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams. |
buffered.records.per.partition | INT | Maximum number of records to buffer per partition. Default is 1000. |
buffered.records.per.partition | INT | Maximum number of records to buffer per partition. Default is 1000. |
connections.max.idle.ms | LONG | Close idle connections after the number of milliseconds specified by this config. |
receive.buffer.bytes | LONG | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. |
reconnect.backoff.ms | LONG | The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. |
reconnect.backoff.max.ms | LONG | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000. |
retries | INT | Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0 |
retry.backoff.ms | LONG | The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100. |
send.buffer.bytes | LONG | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024. |
state.cleanup.delay.ms | LONG | The amount of time in milliseconds to wait before deleting state when a partition has migrated. |
Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.
SET session.timeout.ms=120000;
SET max.poll.record = 20000;
Some of the configuration for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.
SET consumer.<duplicate_config_key>=<value_1>;
SET producer.<duplicate_config_key>=<value_2>;
RocksDB
Stateful data flow applications, might require, on rare occassion, some of the parameters for the underlying RocksDB to be tweaked.
To set the properties, use:
SET rocksdb.<key> = <value>;
Key | Type | Description |
---|---|---|
rocksdb.table.block.cache.size | LONG | Set the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M |
rocksdb.table.block.size | LONG | Approximate size of user data packed per lock. Default: 4K |
rocksdb.table.block.cache.compressed.num.shard.bits | INT | Controls the number of shards for the block compressed cache |
rocksdb.table.block.cache.num.shard.bits | INT | Controls the number of shards for the block cache |
rocksdb.table.block.cache.compressed.size | LONG | Size of compressed block cache. If 0,then block_cache_compressed is set to null |
rocksdb.table.block.restart.interval | INT | Set block restart interval |
rocksdb.table.block.cache.size.and.filter | BOOL | Indicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization |
rocksdb.table.block.checksum.type | STRING | Sets the checksum type to be used with this table. Available values: kNoChecksum , kCRC32c , kxxHash . |
rocksdb.table.block.hash.allow.collision | BOOL | Influence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption) |
rocksdb.table.block.index.type | STRING | Sets the index type to used with this table. Available values: kBinarySearch , kHashSearch |
rocksdb.table.block.no.cache | BOOL | Disable block cache. If this is set to true, then no block cache should be used. Default: false |
rocksdb.table.block.whole.key.filtering | BOOL | If true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true |
rocksdb.table.block.pinl0.filter | BOOL | Indicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false. |
rocksdb.total.threads | INT | The max threads RocksDB should use |
rocksdb.write.buffer.size | LONG | Sets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file |
rocksdb.table.block.size.deviation | INT | This is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10. |
rocksdb.compaction.style | STRING | Available values: LEVEL , UNIVERSAL , FIFO |
rocksdb.max.write.buffer | INT | |
rocksdb.base.background.compaction | INT | |
rocksdb.background.compaction.max | INT | |
rocksdb.subcompaction.max | INT | |
rocksdb.background.flushes.max | INT | |
rocksdb.log.file.max | LONG | |
rocksdb.log.fle.roll.time | LONG | |
rocksdb.compaction.auto | BOOL | |
rocksdb.compaction.level.max | INT | |
rocksdb.files.opened.max | INT | |
rocksdb.wal.ttl | LONG | |
rocksdb.wal.size.limit | LONG | |
rocksdb.memtable.concurrent.write | BOOL | |
rocksdb.os.buffer | BOOL | |
rocksdb.data.sync | BOOL | |
rocksdb.fsync | BOOL | |
rocksdb.log.dir | STRING | |
rocksdb.wal.dir | STRING |