Settings

When defining a data stream flow with the SQL like syntax, the engine offers the syntax constructs to tweak a few parameters. These parameters can be set via the SET operator.

SET `auto.offset.reset` = 'earliest';
SET `processing.guarantee`= 'exactly_once';  //this is for Kafka 0.11+ enabling exactly once semantics
SET `commit.interval.ms` = 1000;             //The frequency with which to save the position of the processor.

Since the Lenses data flow maps to a Kafka Streams flow, the user can tweak the underlying Kafka producer/consumer settings as well as processing settings. The example provided above is a clear sample of how-to do it.

A user can also set the target topic configurations. Follow the Apache Kafka documentation here for a full list of topic-specific configuration options. To set the configuration for the flow result topic you need to prefix the key with topic.. For example to set the cleanup policy to compact ` and to `flush.messages every 5 messages the following SQL code needs to be used:

SET `topic.cleanup.policy`='compact';
SET `topic.flush.messages`= 5;
...

Apart from the topic, producer/consumer or Kafka stream configs, LSQL allows you to set the following:

Setting Description Type Example
autocreate
If the target topic does not exist it will create it. **If the
Kafka setup does not allow for auto topic creation the flow will fail!**
BOOLEAN SET autocreate=true
partitions
The number of partitions to create for the target topic.
Applies only when autocreate is set to true. By default is false.
INTEGER SET partitions=2
replication
How many replicas to create for the target topic. Applies
only when autocreate is set to true. By default is false.
INTEGER SET replication=3
decimal.scale
When working with AVRO records where decimal type is
involved it specifies the decimal scale.
INTEGER SET `decimal.scale`=18
decimal.precision
When working with AVRO records where decimal type is
involved it specifies the decimal precision.
INTEGER SET `decimal.precision`=38

Important

Each SET .. instruction needs to be followed by a semicolon:;.

Here is an example of setting the commit interval to 5 seconds and enabling exactly-once semantics (Apache Kafka 0.11+):

SET `commit.interval.ms` = 5000;
SET `processing.guarantee`= 'exactly_once';

INSERT INTO `hot_sensors`
SELECT
    ip
    , lat
    , `long`
    , (temp * 1.8 + 32) as metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
AND temp > 30

RocksDB Settings

As discussed in the table-stream duality, a table is materialized on the machine running the streaming flow by an instance of RocksDb <https://rocksdb.org/> instance. Being able to set some of the runtime configuration used by the key-value database, is required in very few occasions. The keys to reference by the user when using the SET operator can be found in the table below:

Key Type Description  
rocksdb.table.block.cache.size LONG
Set the amount of cache in bytes
that will be used by RocksDB.
If cacheSize is non-positive, then cache will not be used.
DEFAULT: 8M
 
rocksdb.table.block.size LONG Approximate size of user data packed per block. Default: 4K  
rocksdb.table.block.cache.compressed.num.shard.bits INT TableFormatConfig.setBlockCacheCompressedNumShardBits
Controls the number of shards for the
block compressed cache
rocksdb.table.block.cache.num.shard.bits INT Controls the number of shards for the block cache  
rocksdb.table.block.cache.compressed.size LONG
Size of compressed block cache. If 0,
then block_cache_compressed is set to null
 
rocksdb.table.block.restart.interval INT Set block restart interval  
rocksdb.table.block.cache.size.and.filter BOOL
Indicating if we’d put index/filter blocks to the block cache.
If not specified, each ‘table reader’ object will pre-load index/filter
block during table initialization
 
rocksdb.table.block.checksum.type STRING
Sets the checksum type to be used with this table.
Available values: kNoChecksum, kCRC32c, kxxHash.
 
rocksdb.table.block.hash.allow.collision BOOL
Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
if true, does not store prefix and allows prefix hash collision
(less memory consumption)
 
rocksdb.table.block.index.type STRING
Sets the index type to used with this table.
Available values: kBinarySearch, kHashSearch
 
rocksdb.table.block.no.cache BOOL
Disable block cache. If this is set to true,
then no block cache should be used. Default: false
 
rocksdb.table.block.whole.key.filtering BOOL
If true, place whole keys in the filter (not just prefixes).
This must generally be true for gets to be efficient.
Default: true
 
rocksdb.table.block.pinl0.filter BOOL
Indicating if we’d like to pin L0 index/filter blocks to the block cache.
If not specified, defaults to false.
 
rocksdb.total.threads INT The max threads RocksDB should use  
rocksdb.write.buffer.size LONG
Sets the number of bytes the database will build up in memory
(backed by an unsorted log on disk) before converting to a
sorted on-disk file
 
rocksdb.table.block.size.deviation INT
This is used to close a block before it reaches the configured
‘block_size’. If the percentage of free space in the current block is less
than this specified number and adding a new record to the block will
exceed the configured block size, then this block will be closed and the
new record will be written to the next block.
Default is 10.
 
rocksdb.table.block.format.version INT
We currently have three versions:

0 - This version is currently written out by all RocksDB’s versions by default.
Can be read by really old RocksDB’s. Doesn’t support changing
checksum (default is CRC32).

1 - Can be read by RocksDB’s versions since 3.0.
Supports non-default checksum, like xxHash. It is written by RocksDB when
BlockBasedTableOptions::checksum is something other than kCRC32c. (version
0 is silently upconverted)

2 - Can be read by RocksDB’s versions since 3.10.
Changes the way we encode compressed blocks with LZ4, BZip2, and Zlib
compression. If you don’t plan to run RocksDB before version 3.10,
you should probably use this.

This option only affects newly written tables. When reading existing
tables, the information about version is read from the footer.
 
rocksdb.compaction.style STRING Available values: LEVEL, UNIVERSAL, FIFO  
rocksdb.max.write.buffer INT    
rocksdb.base.background.compaction INT    
rocksdb.background.compaction.max INT    
rocksdb.subcompaction.max INT    
rocksdb.background.flushes.max INT    
rocksdb.log.file.max LONG    
rocksdb.log.fle.roll.time LONG    
rocksdb.compaction.auto BOOL    
rocksdb.compaction.level.max INT    
rocksdb.files.opened.max INT    
rocksdb.wal.ttl LONG    
rocksdb.wal.size.limit LONG    
rocksdb.memtable.concurrent.write BOOL    
rocksdb.os.buffer BOOL    
rocksdb.data.sync BOOL    
rocksdb.fsync BOOL    
rocksdb.log.dir STRING    
rocksdb.wal.dir STRING