# Settings

The SET syntax allows customizing the behaviour for the underlying Kafka Consumer/Producer, Kafka Streams (including RocksDB parameters), topic creation and error handling.

The general syntax is:

```sql
SET <setting_name>=<setting_value>;
```

## Kafka topics <a href="#kafka-topics" id="kafka-topics"></a>

SQL processors can create topics that are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic-related) to allow distinct setups for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.

To set the defaults follow this syntax:

```sql
SET defaults.topic.<topic_setting_key> = <value>;
```

<table><thead><tr><th width="196.33333333333331">Key</th><th width="137">Type</th><th>Description</th></tr></thead><tbody><tr><td>autocreate</td><td>BOOLEAN</td><td>Creates the topic if it does not exist already.</td></tr><tr><td>partitions</td><td>INT</td><td>Controls the target topic partitions count. If the topic already exists, this will not be applied.</td></tr><tr><td>replication</td><td>INT</td><td>Controls the topic replication factor. If the topic already exists, this will not be applied.</td></tr><tr><td></td><td>-</td><td>Each Kafka topics allows a set of parameters to be set. For example <code>cleanup.policy</code><br>can be set like this <code>SET defaults.topic.cleanup.policy='compact,delete';</code></td></tr><tr><td>key.avro.record</td><td>STRING</td><td>Controls the output record Key schema name.</td></tr><tr><td>key.avro.namespace</td><td>STRING</td><td>Controls the output record Key schema namespace.</td></tr><tr><td>value.avro.record</td><td>STRING</td><td>Controls the output record Key schema name.</td></tr><tr><td>value.avro.namespace</td><td>STRING</td><td>Controls the output record Key schema namespace.</td></tr></tbody></table>

All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:

```sql
SET topic.<topic_name>.<topic_setting_key>=<value>;
```

```sql
SET topic.market_risk.cleanup.policy='compact,delete';

--escaping the topic name if it contains . or - or other non-alpha numeric
SET topic.`market.risk`.cleanup.policy='compact,delete';
SET topic.`market-risk`.cleanup.policy='compact,delete';
```

## Error handling <a href="#error-handling" id="error-handling"></a>

The streaming engine allows users to define how errors are handled when writing to or reading from a topic.

Both sides can be set at once by doing:

```sql
SET error.policy= '<error_policy>';
```

or individually as described in the sections below.

### Reading Errors <a href="#reading-errors" id="reading-errors"></a>

Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:

```sql
SET error.policy.read= '<error_policy>';
```

### Writing Errors <a href="#writing-errors" id="writing-errors"></a>

While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:

```sql
SET error.policy.write= '<error_policy>';
```

There are three possible values to control the behaviour.

| Value    | Description                                                                                                                                                                                       |
| -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| continue | Allows the application to carry on. The problem will be logged.                                                                                                                                   |
| fail     | Stops the application. The application will be in a failed (error) state.                                                                                                                         |
| dlq      | Allows the application to continue but it will send the payload to a dead-letter-topic. It requires `dead.letter.queue` to be set. The default value for `dead.letter.queue` is `lenses.sql.dlq`. |

When `dlq` is used this setting is required. The value is the target topic where the problematic records will be sent to.

```sql
SET dead.letter.queue = '<dead_letter_topic>';
```

## Kafka Streams Consumer and Producer settings <a href="#kafka-streams-consumer-and-producer-settings" id="kafka-streams-consumer-and-producer-settings"></a>

Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.

```sql
SET <setting_key>=<value>;
```

\\

<table data-full-width="false"><thead><tr><th width="264.3333333333333">Key</th><th width="118">Type</th><th>Description</th></tr></thead><tbody><tr><td>processing.guarantee</td><td>STRING</td><td>The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production.</td></tr><tr><td>commit.interval.ms</td><td>LONG</td><td>The frequency with which to save the position of the processor. If <code>processing.guarantee</code> is set to <code>EXACTLY_ONCE</code>, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behavior of Tables, as it controls how often they will emit events downstream. An event will be emitted only every <code>commit.interval.ms</code>, so every intermediate event that is received by the table will not be visible downstream directly.</td></tr><tr><td>poll.ms</td><td>LONG</td><td>The amount of time in milliseconds to block waiting for input.</td></tr><tr><td>cache.max.bytes.buffering</td><td>LONG</td><td>Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024.</td></tr><tr><td>client.id</td><td>STRING</td><td>An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘&#x3C;client.d>-StreamThread--&#x3C;consumer</td></tr><tr><td>num.standby.replicas</td><td>INT</td><td>The number of standby replicas for each task. Default value is 0.</td></tr><tr><td>num.stream.threads</td><td>INT</td><td>The number of threads to execute stream processing. Default values is 1.</td></tr><tr><td>max.task.idle.ms</td><td>LONG</td><td>Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.</td></tr><tr><td>buffered.records.per.partition</td><td>INT</td><td>Maximum number of records to buffer per partition. Default is 1000.</td></tr><tr><td>buffered.records.per.partition</td><td>INT</td><td>Maximum number of records to buffer per partition. Default is 1000.</td></tr><tr><td>connections.max.idle.ms</td><td>LONG</td><td>Close idle connections after the number of milliseconds specified by this config.</td></tr><tr><td>receive.buffer.bytes</td><td>LONG</td><td>The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.</td></tr><tr><td>reconnect.backoff.ms</td><td>LONG</td><td>The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.</td></tr><tr><td>reconnect.backoff.max.ms</td><td>LONG</td><td>The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000.</td></tr><tr><td>retries</td><td>INT</td><td>Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0</td></tr><tr><td>retry.backoff.ms</td><td>LONG</td><td>The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100.</td></tr><tr><td>send.buffer.bytes</td><td>LONG</td><td>The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024.</td></tr><tr><td>state.cleanup.delay.ms</td><td>LONG</td><td>The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td></tr></tbody></table>

Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.

```sql
SET session.timeout.ms=120000;
SET max.poll.record = 20000;
```

Some of the configurations for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.

```sql
SET consumer.<duplicate_config_key>=<value_1>;
SET producer.<duplicate_config_key>=<value_2>;
```

## RocksDB <a href="#rocksdb" id="rocksdb"></a>

Stateful data flow applications might require, on rare occasions, some of the parameters for the underlying RocksDB to be tweaked.

To set the properties, use:

```sql
SET rocksdb.<key> = <value>;
```

<table data-full-width="false"><thead><tr><th width="308">Key</th><th width="116.33333333333331">Type</th><th>Description</th></tr></thead><tbody><tr><td>rocksdb.table.block.cache.size</td><td>LONG</td><td>Set the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M</td></tr><tr><td>rocksdb.table.block.size</td><td>LONG</td><td>Approximate size of user data packed per lock. Default: 4K</td></tr><tr><td>rocksdb.table.block.cache.compressed.num.shard.bits</td><td>INT</td><td>Controls the number of shards for the block compressed cache</td></tr><tr><td>rocksdb.table.block.cache.num.shard.bits</td><td>INT</td><td>Controls the number of shards for the block cache</td></tr><tr><td>rocksdb.table.block.cache.compressed.size</td><td>LONG</td><td>Size of compressed block cache. If 0,then block_cache_compressed is set to null</td></tr><tr><td>rocksdb.table.block.restart.interval</td><td>INT</td><td>Set block restart interval</td></tr><tr><td>rocksdb.table.block.cache.size.and.filter</td><td>BOOL</td><td>Indicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization</td></tr><tr><td>rocksdb.table.block.checksum.type</td><td>STRING</td><td>Sets the checksum type to be used with this table. Available values: <code>kNoChecksum</code>, <code>kCRC32c</code>, <code>kxxHash</code>.</td></tr><tr><td>rocksdb.table.block.hash.allow.collision</td><td>BOOL</td><td>Influence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption)</td></tr><tr><td>rocksdb.table.block.index.type</td><td>STRING</td><td>Sets the index type to used with this table. Available values: <code>kBinarySearch</code>, <code>kHashSearch</code></td></tr><tr><td>rocksdb.table.block.no.cache</td><td>BOOL</td><td>Disable block cache. If this is set to true, then no block cache should be used. Default: false</td></tr><tr><td>rocksdb.table.block.whole.key.filtering</td><td>BOOL</td><td>If true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true</td></tr><tr><td>rocksdb.table.block.pinl0.filter</td><td>BOOL</td><td>Indicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false.</td></tr><tr><td>rocksdb.total.threads</td><td>INT</td><td>The max threads RocksDB should use</td></tr><tr><td>rocksdb.write.buffer.size</td><td>LONG</td><td>Sets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file</td></tr><tr><td>rocksdb.table.block.size.deviation</td><td>INT</td><td>This is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10.</td></tr><tr><td>rocksdb.compaction.style</td><td>STRING</td><td>Available values: <code>LEVEL</code>, <code>UNIVERSAL</code>, <code>FIFO</code></td></tr><tr><td>rocksdb.max.write.buffer</td><td>INT</td><td></td></tr><tr><td>rocksdb.base.background.compaction</td><td>INT</td><td></td></tr><tr><td>rocksdb.background.compaction.max</td><td>INT</td><td></td></tr><tr><td>rocksdb.subcompaction.max</td><td>INT</td><td></td></tr><tr><td>rocksdb.background.flushes.max</td><td>INT</td><td></td></tr><tr><td>rocksdb.log.file.max</td><td>LONG</td><td></td></tr><tr><td>rocksdb.log.fle.roll.time</td><td>LONG</td><td></td></tr><tr><td>rocksdb.compaction.auto</td><td>BOOL</td><td></td></tr><tr><td>rocksdb.compaction.level.max</td><td>INT</td><td></td></tr><tr><td>rocksdb.files.opened.max</td><td>INT</td><td></td></tr><tr><td>rocksdb.wal.ttl</td><td>LONG</td><td></td></tr><tr><td>rocksdb.wal.size.limit</td><td>LONG</td><td></td></tr><tr><td>rocksdb.memtable.concurrent.write</td><td>BOOL</td><td></td></tr><tr><td>rocksdb.os.buffer</td><td>BOOL</td><td></td></tr><tr><td>rocksdb.data.sync</td><td>BOOL</td><td></td></tr><tr><td>rocksdb.fsync</td><td>BOOL</td><td></td></tr><tr><td>rocksdb.log.dir</td><td>STRING</td><td></td></tr><tr><td>rocksdb.wal.dir</td><td>STRING</td><td></td></tr></tbody></table>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/devx/6.1/user-guide/applications/sql-processors/settings.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
