Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
In this guide you will learn about SQL Processor apps in Lenses, how to use them to quickly create streaming flows using SQL streaming, deploy and scale them.
SQL Processors continuously process data in Kafka topics. Under the hood, the Kafka Streams APIs are combined with the internal Lenses application deployment framework. A user gets a seamless experience for creating scalable processing components to filter, aggregate, join, and transform Kafka topics, which can scale natively to Kubernetes.
SQL Processors are continuous queries, they process data as it arrives and react to events, e.g. payment made, $5 to Coffeshop. The data is active, the query is passive.
SQL Processors offer:
A no-code, stand-alone application executing a given Lenses SQL query on current and future data
Query graph visualisation
Fully integrated experience within Lenses
ACLs and Monitoring functionality out-of-the-box
Ability to scale up and down workflows via Kubernetes
SQL Processors are long-lived applications that continuously process data. In use with Kubernetes mode (recommended), they are deployed via Lenses, as Kubernetes deployments separate from the Lenses instance.
To create SQL Processors go to Workspace->Apps->New App->SQL Processor.
Enter a name
Enter your SQL statement, the editor will help you with IntelliSense
Optionally specify a description, tag and ProcessorID (consumer group ID)
Select the deployment target, Kubernetes this will be a Kubernetes cluster and namespace.
If the configuration is valid the SQL Processor will be created. You can then click Start to deploy. Processors are not started automatically, click Start
in the Actions menu to start the Processor.
To start a Processor, select Start
from the Action menu.
Select the SQL processor. Lenses will show an overview of the health of the processor.
Selecting a Runner will display further information about each individual runner.
The metadata in the Summary
tab also contains the consumer group ID (Processor ID).
The SQL
tab shows the SQL Streaming statement the processor is running. A visual representation is shown in the Data Flow
tab.
The Configuration tab shows the low-level settings.
To scale a processor, Select Scale
from the Actions menu, and enter the desired amount of runners. In Kubernetes mode, the runners are pods.
Select Stop
in the Actions Menu.
Lenses provides helpful snippets for common scenarios. Select the snippet from the Help section.
ProcessorID is the public unique identifier for an SQL processor. It is customizable, meaning that you, as a user, have control over it and can set this identifier to any arbitrary string.
Restrictions on custom ProcessorIDs:
They have to be unique across all processors
Match the following regex: ^[a-zA-Z0-9\-\._]+
:
Only letters, capital letters, numbers and -
, _
& -
are allowed.
It has to start with a letter or a number.
It cannot be empty.
One important aspect of the ProcessorID is that it is used as the Kafka consumer group identifier. That means that, in practice, this is the value that allows an SQL processor to build its consumer group and coordinate record ingestion from Kafka between all Processor replicas. Consequently, if the ProcessorID of a given SQL processor is changed, that processor will restart consuming messages from the beginning of the existing records in the topic.
The ApplicationID is the Lenses unique identifier, is automatically created by Lenses, and cannot be customized.
This is unique among all applications types; it does not matter if it’s an SQL processor or a different (new) sort of future application.
Lenses uses the ApplicationID to manage applications. This means that, when Starting, Stopping or Scaling an application, Lenses will use this attribute to pick the right instance.
This page describes handling nulls values in Kafka data with Lenses SQL Processors.
Null values are used as a way to express a value that isn’t yet known.
Null values can be found in the data present in existing sources, or they can be the product of joining data using non-inner joins.
The schema of nullable types is represented as a union of the field type and a null value:
Working with null values can create situations where it’s not clear what the outcome of an operation is.
One example of this would be the following:
Looking at the first two expressions, one may be tempted to solve the problem above by saying “Null is 1 when multiplying and 0 when summing” meaning the following would be the evaluation result:
Rewriting the third expression applying the distributive property of multiplication however, shows that the rule creates inconsistencies:
With the intent of avoiding scenarios like the above where a computation may have different results based on the evaluation approach taken, most operations in lenses do not allow operations to use nullable types.
Lenses provides the following tools to address nullability in a flexible way:
Coalesce: A function that allows specifying a list of fields to be tried until the first non-null value is found.
Note: the coalesce function won’t verify if a non-nullable field is provided so an error may still be thrown if all the provided fields are null
e.g:COALESCE(nullable_fieldA, nullable_fieldB, 0)
AS_NON_NULLABLE: a function that changes the type of a property from nullable to non-nullable.
Note: This function is unsafe and will throw an error if a null value is passed. It should only be used if there’s a guarantee that the value won’t ever be null (for instance if used in a CASE branch where the null case has been previously handled or if the data has previously been filtered and the null values removed).
e.g: AS_NON_NULLABLE(nullable_field)
AS_NON_NULLABLE with CASE: A type-checked construct equivalent to using coalesce:
e.g:
The AS_NULLABLE
function is the inverse transformation of the AS_NON_NULLABLE
version. This function allows a non-nullable field type to be transformed into a nullable type. It can be used to insert data into existing topics where the schema of the target field is nullable.
This page describes how to aggregate data in Kafka with Lenses SQL Processors.
Aggregations are stateful transformations that allow to grouping of an unbounded set of inputs into sub-sets and then aggregate each of these sub-sets into a single output; the reason why they are stateful is because they need to maintain the current state of computation between the application of each input.
To group a given input dataset into sub-sets, a key function needs to be specified; the result of applying this key function to an input record will be used as a discriminator (sometimes called a pivot) to determine in what sub-set each input record is to be bucketed.
The specific transformation that each aggregation performs is described by the Aggregated Functions used in the input query.
Notice that the behaviour described above is precisely what a Table does. For any given key, there is the state will continuously be updated as new events with the given key are received. In the case of Aggregations, new events are represented by input records in the original dataset that will map to a given key, therefore ending up in a bucket or another.
Whenever Aggregations are used, the result will be a Table. Each entry will have the key set to the grouping discriminator, and the value set to the current state of computation for all input records matching the key.
The complete syntax for aggregations is:
The specific syntactical elements of the above are:
(STREAM | TABLE)
: specifies if the <source>
is to be interpreted as a stream or a table.
<aggregated projection1>
: a projection is aggregated when its source contains an Aggregated Function (e.g. COUNT(*) as x
, CAST(COUNT(*) as STRING) as stringed
).
[, aggregated projection2] ... [, aggregated projectionN]
: a query can contain any number of additional aggregated projections after the first mandatory one.
[, projection1] ... [, projectionM]
: a query can contain any number of common, non-aggregated, projections. Streaming only supports full GROUP BY mode. This means that fields that are not part of the GROUP BY
clause cannot be referenced by non-aggregated projections.
<source>
: a normal source, like a topic or the result of a WITH
statement.
[WINDOW BY <window description>]
: this optional section can only be specified if STREAM
is used. It allows us to describe the windowing that will be applied to the aggregation.
GROUP BY <expression>
: the result of evaluating <expression>
will be used to divide the input values into different groups. These groups will be the input for each aggregated projection specified. The <expression>
’s result will become the key for the table resulting from the query.
Most of the rules and syntax described for Projections apply to aggregated projections as well, but there are some additional syntactical rules due to the specific nature of Aggregations.
Aliasing rules mostly work the same, but it is not possible to project on the key facet; COUNT(*) as _key.a
or SUM(x) as _key
are therefore not allowed.
At least one aggregated projection must be specified in the query.
Projections using an unqualified key facet as the source are not allowed. _key.a
or COUNT(_key.b)
are forbidden because _key
is unqualified, but <source>._key.a
and COUNT(<source>._key.b)
are supported.
As previously mentioned, the GROUP BY
is used to determine the key of the query’s result; the above query will group all records in input-topic
by the value of field1
in each record, and target-topic
’s key will be the schema of field1
.
Just like in the case of the Projections, the Streaming mode takes an opinionated approach here and will simplify the result schema and Storage Format in the case of single field structures.
In the case above, assuming for example that field1
is an integer, target-topic
’s key will not be a structure with a single integer field1
field, but rather just the value field1
; the resulting storage format is going to be INT
, and the label field1
will be just dropped.
In case the above behaviour is not desirable, specifying an explicit alias will allow us to override it.
This will result in target-topic
’s key is a structure with a field keep_me
, with the same schema as field1
. The corresponding Storage Format will match the input format for input-topic
, AVRO
or JSON
.
An example will help clarify how aggregations work, as well as how they behave depending on the semantics of the input dataset they are being applied to.
Assume that we have a Kafka topic (gaming-sessions
) containing these records:
What this data describes is a series of gaming sessions, performed by a player. For each gaming session, the player (used as Key), the points achieved, and the country where the game took place.
Let’s now assume that what we want to calculate is the total points achieved by players in a given country, as well as the average points per game. One way to achieve the desired behaviour is to build a Stream from the input topic. Remember that this means that each event will be considered in isolation.
Explanations for each element of this syntax can be found below, but very briefly, this builds a Stream from gaming-sessions
, grouping all events by country
(e.g. all records with the same country
will be aggregated together) and finally calculating the total (total_points
) and the average (average_points
) of all points for a given group.
The final result in target-topic
will be (disregarding intermediate events):
The results are calculated from the totality of the input results because in a Stream, each event is independent and unrelated to any other.
We now want to calculate something similar to what we obtained before, but we want to keep track only of the last session played by a player, as it might give us a better snapshot of both the performances and locations of players worldwide. The statistics we want to gather are the same as before: total and average of points per country.
The way to achieve the above requirement is simply by reading gaming-sessions into a Table, rather than a Stream, and aggregating it.
The final result in target-topic
will be (disregarding intermediate events):
Compare this with the behaviour from the previous scenario; the key difference is that the value for uk
includes only willy
and noel
, and that’s because the last event moved billy
to the spain
bucket, removing all data regarding him from his original group.
The previous section described the behaviour of aggregations when applied to Tables, and highlighted how aggregations not only need to be able to sum the latest values received to the current state of a group but also need to be able to subtract an obsolete value that might have just been assigned to a new group. As we saw above, it is easy to do this in the case of SUM
and AVG
.
However, consider what would happen if we wanted to add new statistics to the ones calculated above: the maximum points achieved by a player in a given country.
In the Stream scenario, this can be achieved by simply adding MAXK(points,1) as max_points
to the query.
In the Table scenario, however, things are different. We know that the final event moves billy
from uk
to spain
, so we need to subtract from uk
all information related to billy
. In case of SUM
and AVG
that’s possible because subtracting billy
’s points to the current value of the aggregation will return the correct result.
But that’s not possible for MAXK
. MAXK(points, 1)
only keeps track of 1
value, the highest seen so far, and if that’s removed, what value should take its place? The aggregation function cannot inspect the entire topic data to search for the correct answer. The state the aggregation function has access to is that single number, which now is invalid.
This problem explains why some aggregated functions can be used on Streams and Tables (e.g. SUM
), while others can be used only on Streams (e.g. MAXK
).
The key factor is usually whether a hypothetical subtraction operation would need access to all previous inputs to calculate its new value (like MAXK
) or just the aggregated state (like SUM
).
A common scenario that arises in the context of aggregations is the idea of adding a time dimension to the grouping logic expressed in the query. For example, one might want to group all input records by a given field that were received within 1 hour of each other.
To express the above Lenses SQL Streaming supports windowed aggregations, by adding a WINDOW BY
clause to the query. Given their semantics, tables cannot be aggregated using a window, because it would not make sense. A table represents the latest_ state of a set of (Key, Value) pairs, not a series of events interspersed over a time continuum. Thus trying to window them is not a sensible operation.
Filtering the input into aggregated queries is similar to filtering non-aggregated ones. When using a WHERE <expression>
statement, where <expression>
is a valid SQL boolean expression, all records that do not match the predicate will be left out.
However, aggregated functions add a further dimension to what it might be desirable to filter.
We might be interested in filtering based on some conditions of the groups themselves; for example, we might want to count all input records that have a given value offield1
, but only if the total is greater than 3. In this case, WHERE
would not help, because it has no access to the groups nor to the results of the aggregated projections. The below query is what is needed.
The above query uses the HAVING
clause to express a filter at a grouping level. Using this feature it is possible to express a predicate on the result of aggregated projections and filter out the output records that do not satisfy it.
Only aggregated projections specified in the SELECT
clause can be used within the HAVING
clause.
This page describes how to join data in Kafka with in Lenses SQL Processors.
Joins allow rows from different sources to be combined.
Lenses allows two sources of data to be combined based either on the equality of their _key facet or using a user-provided expression.
A query using joins looks like a regular query apart from the definition of its source and in some cases the need to specify a window expression:
projection: the projection of a join expression differs very little from a regular projection. The only important consideration is that since data is selected from two sources, some fields may be common to both. The syntax table.field
is recommended to avoid this type of problem.
sourceA/sourceB : the two sources of data to combine.
window: only used if two streams are joined. Specifies the interval of time to search matching results.
joinExpression: a boolean expression that specifies how the combination of the two sources is calculated.
filterExpression: a filter expression specifying which records should be filtered.
When two sources of data are combined it is possible to control which records to keep when a match is not found:
Disclaimer: The following examples do not take into consideration windowing and/or table materialization concerns.
Customers
Orders
This join type will only emit records where a match has occurred.
(Notice there’s no item with customer.id = 2
nor is there a customer with id = 3
so these two rows are not present in the result).
This join type selects all the records from the left side of the join regardless of a match:
(Notice all the rows from orders are present but since no customer.id = 3
no name can be set.)
A right join can be seen as a mirror of a LEFT JOIN. It selects all the records from the right side of the join regardless of a match:
An outer join can be seen as the union of left and right joins. It selects all records from the left and right side of the join regardless of a match happening:
By default, if no ON expression is provided, the join will be evaluated based on the equality of the _key facet. This means that the following queries are equivalent:
When an expression is provided, however, there are limitations regarding what kind of expressions can be evaluated.
Currently, the following expression types are supported:
Equality expressions using equality (=) with one table on each side:
customers.id = order.user_id
customers.id - 1 = order.user_id - 1
substr(customers.name, 5) = order.item
Any boolean expression which references only one table:
len(customers.name) > 10
substr(customer.name,1) = "J"
len(customer.name) > 10 OR customer_key > 1
Allowed expressions mixed together using an AND operator:
customers._key = order.user_id AND len(customers.name) > 10
len(customers.name) > 10 AND substr(customer.name,1) = "J"
substr(customers.name, 5) = order.item AND len(customer.name) > 10 OR customer_key > 1
Any expressions not following the rules above will be rejected:
More than one table is referenced on each side of the equality operator
concat(customer.name, item.name) = "John"
customer._key - order.customer_id = 0
a boolean expression not separated by an AND references more than one table:
customer._key = 1 OR customer._key = order.customer_id
When two streams are joined Lenses needs to know how far away in the past and in the future to look for a matching record.
This approach is called a “Sliding Window” and works like this:
Customers
Purchases
At t=10
, when both the Computer and the Keyboard records arrive, only one customer can be found within the given time window (the specified window is 5s thus the window will be [10-5,10+5]s ).
This means that the following would be the result of running the query:
Note: John will not match the Keyboard purchase since t=20s is not within the window interval [10-5,10+5]s.
When streaming data, records can be produced at different rates and even out of order. This means that often a match may not be found because a record hasn’t arrived yet.
The following example shows an example of a join between a stream and a table where the arrival of the purchase information is made available before the customers’ information is.
(Notice that the purchase of a “Keyboard” by customer_id = 2
is produced before the record with the customer details is.)
Customers
Purchases
Running the following query:
would result in the following:
If later, the record for customer_id = 2
is available:
a record would be emitted with the result now looking like the following:
Notice that “Keyboard” appears twice, once for the situation where the data is missing and another for when the data is made available.
This scenario will happen whenever a Stream is joined with a Table using a non-inner join.
The following table shows which combinations of table/stream joins are available:
In order to evaluate a join between two sources, the key facet for both sources has to share the same initial format.
If formats are not the same the join can’t be evaluated. To address this issue, an intermediate topic can be created with the correct format using a STORE AS statement. This newly created topic can then be created as the new source.
In addition to the constraint aforementioned, when joining, it’s required that the partition number of both sources be the same.
When a mismatch is found, an additional step will be added to the join evaluation in order to guarantee an equal number of partitions between the two sources. This step will write the data from the source topic with a smaller count of partitions into an intermediate one.
This newly created topic will match the partition count of the source with the highest partition count.
In the topology view, this step will show up as a Repartition Node.
Joining two topics is only possible if the two sources used in the join share the same key shape and decoder.
When an ON statement is specified, the original key facet will have to change so that it matches the expression provided in the ON statement. Lenses will do this calculation automatically. As a result, the key schema of the result will not be the same as either one of the sources. It will be a Lenses calculated object equivalent to the join expression specified in the query.
As discussed when addressing join types, some values may have null values when non-inner joins are used.
Due to this fact, fields that may have null values will be typed as the union of null and their original type.
Within the same query, joins may only be evaluated between two sources.
When a join between more than two sources is required, multiple queries can be combined using a WITH statement:
In order to group the results of a join, one just has to provide a GROUP BY
expressions after a join expression is specified.
Purchases
When a join between a table and a join is processed, lenses will, for each stream input (orders in the example above), look for a matching record on the specified table (customers).
Notice that the record with Frank’s purchase information is processed at t = 10s
at which point Frank’s Customer information hasn’t yet been processed. This means that no match will be found for this record.
At t=20s
however, the record with Frank’s customer information is processed; this will only trigger the emission of a new record if an Outer Join is used.
There are some cases where filter expressions can help optimize a query. A filter can be broken down into multiple steps so that some can be applied before the join node is evaluated. This type of optimization will reduce the number of records going into the join node and consequentially increase its speed.
For this reason, in some cases, filters will show up before the join in the topology node.
This page describes the concepts of Lenses SQL Processors from joining, aggregating and filtering data in Kafka.
SQL Processors see data as an independent sequence of infinite events. An Event in this context is a datum of information; the smallest element of information that the underlying system uses to communicate. In Kafka’s case, this is a Kafka record/message.
Two parts of the Kafka record are relevant:
Key
Value
These are referred to as facets by the engine. These two components can hold any type of data and Kafka itself is agnostic on the actual storage format for either of these two fields. SQL Processors interpret records as (key, value) pairs, and it exposes ways to manipulate these pairs in several ways.
As mentioned above, queries that are meant to be run on streaming data are treated as stand-alone applications. These applications, in the context of the Lenses platform, are referred to as SQL Processors.
A SQL Processor encapsulates a specific Lenses SQL query, its details and everything else Lenses needs to be able to run the query continuously.
To support features like:
Inference of output schemas
Creation-time validation of input query
Selections
Expressions
Lenses SQL Engine Streaming mode needs to have up-to-date schema information for all structured topics that are used as input in a given query. In this context, structured means topics that are using complex storage formats like AVRO
or JSON
.
For the above query, for example, the purchases
topic will need to have a value set to a structured format and a valid schema will need to already have been configured in Lenses. In such schema, fields itemId
, price
and quantity
must be defined, the latter two being of a numerical type.
These requirements ensure the Engine will always be in a position to know what kind of data it will be working with, guaranteeing at the same time that all obvious errors are caught before a query is submitted.
The UI allows us to visualise any SQL Processor out of the box. For the example:
This visualisation helps to highlight that the Lenses SQL fully supports M-N topologies.
What this means is that multiple input topics can be read at the same time, their data manipulated in different ways and then the corresponding results stored in several output topics, all as part of the same Processor’s topology.
This means that all processing can be done in one go, without having to split parts of a topology into different Processors (which could result in more data being stored and shuffled by Kafka).
An expression is any part of a Lenses SQL query that can be evaluated to a concrete value (not to be confused with a record value).
In a query like the following:
CONCAT('a', 'b')
, (1 + field1)
and field2
are all expressions whose values will be _projected_
onto the output topic, whereas LENGTH(field2) > 5
is an expression whose values will be used to filter out input records.
SQL Processors are built on top of Kafka Streams, and it enriches this tool with an implementation of Lenses SQL that fits well with the architecture and design of Kafka Streams. When executed, they run a Kafka Streams instance.
Each SQL Processor has an application ID which uniquely identifies it within Lenses. The application ID is used as the Kafka Streams application ID which in turn becomes the underlying Kafka Consumer(s) group identifier.
Scaling up or down the number of runners automatically adapts and rebalances the underlying Kafka Streams application in line with the Kafka group semantics.
The advantages of using Kafka Streams as the underlying technology for SQL Processors are several:
Kafka Streams is an enterprise-ready, widely adopted and understood technology that integrates natively with Kafka
Using consumer group semantics allows leveraging Kafka’s distribution of workload, fault tolerance and replication out of the box
A stream is probably the most fundamental abstraction that SQL Processors provide, and it represents an unbounded sequence of independent events over a continuously changing dataset.
Let’s clarify the key terms in the above definition:
event: an event, as explained earlier, is a datum, that is a (key, value) pair. In Kafka, it is a record.
continuously changing dataset: the dataset is the totality of all data described by every event received so far. As such, it is changed every time a new event is received.
unbounded: this means that the number of events changing the dataset is unknown and it could even be infinite
independent: events don’t relate to each other and, in a stream, they are to be considered in isolation
The main implication of this is that stream transformations (e.g. operations that preserve the stream semantics) are stateless because the only thing they need to take into account is the single event being transformed. Most Projections fall within this category.
To illustrate the meaning of the above definition, imagine that the following two events are received by a stream:
Now, if the desired operation on this stream was to sum the values of all events with the same key (this is called an Aggregation), the result for "key1"
would be 30
, because each event is taken in isolation.
Finally, compare this behaviour with that of tables, as explained below, to get an intuition of how these two abstractions are related but different.
Lenses SQL streaming supports reading a data source (e.g. a Kafka topic) into a stream by using SELECT STREAM
.
The above example will create a stream that will emit an event for each record, including future ones.
While a stream is useful to have visibility to every change in a dataset, sometimes it is necessary to hold a snapshot of the most current state of the dataset at any given time.
This is a familiar use-case for a database and the Streaming abstraction for this is aptly called table.
For each key, a table holds the latest version received of its value, which means that upon receiving events for keys that already have an associated value, such values will be overridden.
A table is sometimes referred to as a changelog stream, to highlight the fact that each event in the stream is interpreted as an update.
Given its nature, a table is intrinsically a stateful construct, because it needs to keep track of what it has already been seen. The main implication of this is that table transformations will consequently also be stateful, which in this context means that they will require local storage and data being copied.
Additionally, tables support delete semantics. An input event with a given key and a null
value will be interpreted as a signal to delete the (key, value) pair from the table.
Finally, a table needs the key for all the input events to not be null
. To avoid issues, tables will ignore and discard input events that have a null
key.
To illustrate the above definition, imagine that the following two events are received by a table:
Now, if the desired operation on this table was to sum the values of all events with the same key (this is called an Aggregation), the result for key1
would be 20
, because (key1, 20)
is interpreted as an update.
Finally, compare this behaviour with that of streams, as explained above, to get an intuition of how these two abstractions are related but different.
Lenses SQL Streaming supports reading a data source (e.g. a Kafka topic) into a table by using SELECT TABLE
.
The above example will create a table that will treat each event on input-topic
, including future ones, as updates.
Given the semantics of tables, and the mechanics of how Kafka stores data, the Lenses SQL Streaming will set the cleanup.policy
setting of every new topic that is created from a table to compact
, unless explicitly specified otherwise.
What this means is that the data on the topic will be stored with a semantic more closely aligned to that of a table (in fact, tables in Kafka Streams use compacted topics internally). For further information regarding the implications of this, it is advisable to read the official Kafka Documentation about cleanup.policy
.
Streams and tables have significantly different semantics and use cases, but one interesting observation is that are strongly related nonetheless.
This relationship is known as stream-table duality. It is described by the fact that every stream can be interpreted as a table, and similarly, a table can be interpreted as a stream.
Stream as Table: A stream can be seen as the changelog of a table. Each event in the stream represents a state change in the table. As such, a table can always be reconstructed by replaying all events of a stream, in order.
Table as Stream: A table can be seen as a snapshot, at a point in time, of the latest value received for each key in a stream. As such, a stream can always be reconstructed by iterating over each (Key, Value) pair and emitting it as an event.
To clarify the above duality, let’s use a chess game as an example.
On the left side of the above image, a chessboard at a specific point in time during a game is shown. This can be seen as a table where the key is a given piece and the value is its position. Also, on the right-hand side, there is the list of moves that culminated in the positioning described on the left; it should be obvious that this can be seen as a stream of events.
The idea formalised by the stream-table duality is that, as it should be clear from the above picture, we can always build a table from a stream (by applying all moves in order).
It is also always possible to build a stream from a table. In the case of the chess example, a stream could be made where each element represents the current state of a single piece (e.g. w: Q h3).
This duality is very important because it is actively used by Kafka (as well as several other storage technologies), for example, to replicate data and data stores and to guarantee fault tolerance. It is also used to translate table and stream nodes within different parts of a query.
One of the main goals of SQL Processors is to ensure that it uses all the information available to it when a SQL Processor is created to catch problems, suggest improvements and prevent errors. It’s more efficient and less frustrating to have an issue coming up during registration rather than at some unpredictable moment in the future, at runtime, possibly generating corrupted data.
SQL engine will actively check the following during the registration of a processor:
Validation of all user inputs
Query lexical correctness
Query semantics correctness
Existence of the input topics used within the query
User permissions to all input and output topics
Schema alignment between fields and topics used within the query
Format alignment between data written and output topics, if the latter already exist
When all the above checks pass, the Engine will:
Generate a SQL Processor able to execute the user’s query
Generate and save valid schemas for all output topics to be created
Monitor the processor and make such metrics available to Lenses
The Engine takes a principled and opinionated approach to schemas and typing information; what this means is that, for example, where there is no schema information for a given topic, that topic’s fields will not be available to the Engine, even if they are present in the data; also, if a field in a topic is a string
, it will not be possible to use it as a number for example, without explicitly CAST
ing it.
The Engine’s approach allows it to support naming and reusing parts of a query multiple times. This can be achieved using the dedicated statement WITH
.
The WITH
s allow for whole sections of the query to be reused and manipulated independently by successive statements, and all this is done by maintaining schema and format alignment and correctness. The reason why this is useful is that it allows to specify queries that split their processing flow without having to redefine parts of the topology. This, in turn, means that less data needs to be read and written to Kafka, improving performance.
This is just an example of what SQL Processors can offer because of the design choices taken and the strict rules implemented at query registration.
This page describes the time and windowing of data in Kafka with Lenses SQL Processors.
A data stream is a sequence of events ordered by time. Each entry contains a timestamp component, which aligns it on the time axis.
Kafka provides the source for the data streams, and the Kafka message comes with the timestamp built in. This is used by the Push Engines by default. One thing to consider is that from a time perspective, the stream records can be out of order. Two Kafka records, R1 and R2 do not necessarily respect the rule: R1 timestamp is smaller than R2 timestamp.
Timestamps are required to perform time-dependent operations for streams - like aggregations and joins.
A record timestamp value can have three distinct meanings. Kafka allows to configure a topic timestamp meaning via this log.message.timestamp.type
setting. The two supported values are CreateTime
and LogAppendTime
.
When a record is created at source the producer is responsible for setting the timestamp for it. Kafka producer provides this automatically and this is aligned with the CreateTime
the configuration mentioned earlier.
At times, the data source timestamp is not available. When setting the topic timestamp type to LogAppendTime
, the Kafka broker will attach the timestamp at the moment it writes it to the topic.
The timestamp will be set to the time the record was read by the engine, ignoring any previously set timestamps.
Sometimes, when the data source is not under direct control, it might be that the record’s timestamp is actually embedded in the payload, either in the key or the value.
Lenses SQL Streaming allows to specify where to extract the timestamp from the record by using EVENTTIME BY
.
where <selection>
is a valid selection.
Here are a few examples on how to use the syntax to use the timestamp from the record value facet:
For those scenarios when the timestamp value lives within the record key, the syntax is similar:
All records produced by the Lenses SQL Streaming will have a timestamp set and its value will be one of the following:
For direct transformations, where the output record is a straightforward transformation of the input, the input record timestamp will be used.
For aggregations, the timestamp of the latest input record being aggregated will be used.
In all other scenarios, the timestamp at which the output record is generated will be used.
Some stream processing operations, like joins or aggregations, require distinct time boundaries which are called windows. For each time window, there is a start and an end, and as a result a duration. Performing aggregations over a time window, means only the records which fall within the time window boundaries are aggregated together. It might happen for the records to be out-of-order and arrive after the window end has passed, but they will be associated with the correct window.
There are three-time windows to be used at the moment: hopping, tumbling and session.
When defining a time window size, the following types are available:
These are fixed-size and overlapping windows. They are characterised by duration and the hop interval. The hop interval specifies how far a window moves forward in time relative to the previous window.
Since the windows can overlap, a record can be associated with more than one window.
Use this syntax to define a hopping window:
They are a particularisation of hopping windows, where the duration and hop interval are equal. This means that two windows can never overlap, therefore a record can only be associated with one window.
Duration time takes the same unit types as described earlier for hopping windows.
Unlike the other two window types, this window size is dynamic and driven by the data. Similar to tumbling windows, these are non-overlapping windows.
A session window is defined as a period of activity separated by a specified gap of inactivity. Any records with timestamps that occur within the boundaries of the inactivity interval are considered part of the existing sessions. When a record arrives and its timestamp is outside of the session gap, a new session window is created and the record will belong to that.
A new session window starts if the last record that arrived is further back in time than the specified inactivity gap. Additionally, different session windows might be merged into a single one if an event is received that falls in between two existing windows, and the resulting windows would then overlap.
To define a session window the following syntax should be used:
The inactivity interval can take the time unit type seen earlier for the hopping window.
Session windows are tracked on a per-key basis. This means windows for different keys will likely have different durations. Even for the same key, the window duration can vary.
User behaviour analysis is an example of when to use session windows. They allow metrics like counting user visits, customer conversion funnel or event flows.
It is quite common to see records belonging to one window arriving late, that is after the window end time has passed. To accept these records the notion of a grace period is supported. This means that if a record timestamp falls within a window W and it arrives within W + G (where G is the grace interval) then the record will be processed and the aggregations or joins will update. If, however, the record comes after the grace period then it is discarded.
To control the grace interval use this syntax:
The default grace period is 24 hours. Until the grace period elapses, the window is not actually closed.
This page describes how to use lateral joins for data in Kafka with Lenses SQL Processors.
With Lateral Joins you can combine a data source with any array expression. As a result, you will get a new data source, where every record of the original one will be joined with the values of the lateral array expression.
Assume you have a source
where elements
is an array field:
field1 | field2 | elements |
---|---|---|
Then a Lateral Join of source
with elements
is a new table, where every record of source
will be joined with all the single items of the value of elements
for that record:
field1 | field2 | elements | element |
---|---|---|---|
In this way, the single elements of the array become available and can be used as a normal field in the query.
A query using lateral joins looks like a regular query apart from the definition of its source:
projection: as in a single-table select, all the fields from <source>
will be available in the projection. In addition to that, the special field <lateralAlias>
will be available.
source: the source of data. Note: it is not possible to specify a normal join as a source of a lateral join. This limitation will be removed in the future.
lateralArrayExpression: any expression that evaluates to an array. Fields <source>
are available for defining this expression.
filterExpression: a filter expression specifying which records should be filtered.
Assume you have a topic batched_readings populated with the following records:
batched_readings
As you can see, readings
is a field containing arrays of integers.
We define a processor like this:
The processor will emil the following records:
Things to notice:
We used the aliased lateral expression reading
both in the projection and in the WHERE
.
The _key
for each emitted record is the one of the original record. As usual you can change this behavior projecting on the key with a projection like expression AS _key
.
batched_readings
records with keys a
and b
have been split into multiple records. That’s because they contain multiple readings greater than 90
.
Record d
disappeared, because it has no readings greater than 90
It is possible to use multiple LATERAL
joins in the same FROM
clause.
Assume you have a topic batched_nested_readings populated with the following records:
batched_readings
Notice how nested_readings
contains arrays of arrays of integers.
To get the same results of the previous example, we use a first lateral join to unpack the first level of nested_readings
into an array that we call readings
. We then define a second lateral join on readings
to extract the single values:
In the previous example we used a simple field as the <lateralArrayExpression>
. In the section we will see how any array expression can be used for it.
Assume you have a topic day_night_readings populated with the following records:
day_night_readings
We can make use of Array Functions to lateral join day_night_readings
on the concatenation of the two readings fields:
This page describes using projections for data in Kafka with Lenses SQL Processors.
A projection represents the ability to project a given input value onto a target location in an output record. Projections are the main building block of SELECT
statements.
A projection is composed of several parts. Projections have a source section that allows one to select a specific value and to ensure that it will be present in the output, in the desired structure and with the desired name, as described by the target section.
In the below query:
These are projections:
CONCAT(‘a’, ‘b’) as result1
field4
(1 + field1) as _key.a
_key.field2 as result3
5 + 7 as constantField
CASE … as who_is_it
It is worth highlighting that projections themselves are stateless. While the calculation of the source value could be stateful (depending on the type of expression being evaluated, the act of making the value available in the output is not a stateful operation.
The precise syntax of a projection is:
In the above, []
indicates optional sections and |
is to be read as OR.
Source:
<expression>
: the source section must consist of a valid SQL expression, which will be evaluated to the value to be projected.
Target:
this section is optional. When missing, the output target facet will be _value
and the <alias>
will be defaulted to the string representation of the source expression.
as
: this is the Lenses SQL keyword that makes specifying the target of the projection explicit.
[<facet>.]<alias>]|[<facet>]
: this nested section, which must be specified whenever as
is used, is composed of two mutually exclusive sub-sections.
[<facet>.]<alias>]
: this optional section is a string that will be used as the field name in the output record. Optionally, the output _facet_
where the field will be projected (e.g. _key
or _value
) can be also specified.
[<facet>]
: this optional section specifies that the result of the projection will make up the whole of the indicated facet of the output record (e.g. the whole key or the whole value).
The above syntax highlights something important about the relationship between projections and facets: a projection can have a source facet and will always have a target facet and while they are related, they might not always be the same.
In the above example query:
field4
is a projection from value facet to value facet
(1 + field1) as _key.a
is a projection from the value facet to the key facet
_key.field2 as result3
is a projection from the key facet to the value facet
5 + 7 as constantField
is a projection to value facet but with no source facet, as the expression does not depend on the input record
This makes SQL projections very expressive, but it also means that attention needs to be paid when facets are manipulated explicitly. Details, edge cases and implications of this will be discussed below.
_value
fieldThis is the default type of projection when a field is selected within the source expression. Unless otherwise specified, the source facet of a projection will always be _value
.
In light of the above, the following query contains only projections that read from the _value
facet of the source:
Also, notice that field1
and _value.field1 as aliased
are reading the exact same input field and in the former case the _value
facet is implicit.
A projection can also access a selected field on the key facet.
The above query contains only projections that read from the _key
facet of the source.
This kind of projection behaves exactly the same as any other projection, but because of specific interactions with other mechanics in Lenses SQL Engine Streaming mode, they can’t be used in the same query with Aggregations or Joins.
All examples of projections described until now focused on selecting a field from either the key or the value of an input record. However, a projection can also read a whole facet and project it to the output.
In the above query, there are two projections:
_key as old-key
: This is projecting the whole key of input-topic
onto a field old-key
on target-topic
_value as old-value
: This is projecting the whole value of input-topic
onto a field old-value
on target-topic
For more details about the rules around using aliases (as done in the above example).
This can be useful when the input source uses a primitive storage format for either one or both facets, but it is desirable to map such facets to named fields within a more complex output structure, as could be the case in the above query. This said projections from whole facets are supported for all storage formats, not only the primitive ones.
As it should be clear from all the examples on this page so far, projections can be freely mixed within a single SELECT
statement; the same query can have many projections, some of which could be reading from the key of the input record, some others from the value and yet others returning literal constants.
Lenses SQL is designed to support this mixed usage and to calculate the appropriate resulting structure given the schemas of all the projections’ inputs.
Lenses SQL assigns a special meaning to *
when used as a projection.
Unqualified Wildcard projection
When *
is used without any further qualification, it is interpreted as an instruction to project all fields from _key_
and _value_
to the output.
The result of this query is that target-topic
will have exactly the same fields, schema and data than input-topic
.
When *
is explicitly qualified, the meaning becomes more precise and it will limit the fields to be selected to only the ones belonging to the qualified source (and optionally facet).
The above shows how a qualified wildcard projection can be used to target all the fields of a specific source. Additionally, a qualified wildcard can be used in addition to other normal projections (e.g. i2.field1
).
The target of a projection is the location within the output record where the result of the projection’s expression is going to be mapped. As previously mentioned, Lenses SQL uses the keyword as
to explicitly control this.
Using as
, it is possible to:
Assign an alias to the projected field in the result. For example, field1 as aliased-field1
is reading field1
and projecting its value to a field called aliased-field1
. Notice that, as no facet information is specified, this will be targeting the value of the output record.
Project directly to nested fields within structures
The above query will result in a field x
that is a structure that contains two fields a
and b
.
Control the facet to which the field is projected. For example, field1 as _key.field1
is reading field1
(from the value facet) and projecting to a field with the same name on the key of the output. Depending on the source being projected, doing this might have important implications.
Project over the whole target value or key For example, field1 as _value
is reading field1
and projecting it over the whole value of the output. One important thing about this is that Lenses SQL allows only one projection of this kind per facet per query. What this means is that the following query would be invalid, because only one projection can target _value
within the same query:
In order to avoid potential errors, Lenses defines the following rules for defining aliases:
Alias can’t add new fields to non-struct properties.
e.g: an_int AS foo.bar, field1 AS foo.bar.field1
(since bar
will be an INT
we can’t define a property field1
under it)
Aliases cannot override previously defined properties.
e.g: field1 AS foo.bar.field1, field2 as foo.bar
(setting foo.bar
with the contents of field2
would override the value of field1
)
Fields cannot have duplicated names.
e.g: field1 AS foo.bar, field2 as foo.bar
(setting foo.bar
with the contents of field2
would override it’s previous content)
Projecting on Key is a feature that can be useful in situations where it is desirable to quickly change the key of a Table or a Stream, maybe in preparation for further operations (e.g. joins etc…). This feature is sometimes referred to as re-keying within the industry.
However, one important implication of using this feature is that Kafka uses the key to determine in what partition a record must be stored; by changing the key of the record, the resulting partitioning of the output topic might differ from one of the input topics. While there is nothing wrong with this, it is something that must be understood clearly when using this feature.
Sometimes it is desirable to limit the input records to be projected based on some predicate.
For example, we might want to project field1
and field2
of input-topic
onto output-topic
, but only if field3
contains a specific value.
This is the WHERE
clause is used: to filter the input dataset by some predicate, applying the rest of the query only to records that match the predicate.
The syntax for this clause is simply WHERE <expression>
, where <expression>
is a valid arbitrarily nested Lenses SQL boolean expression.
Projections have a close relationship with the storage format of their target.
By default, if a query contains more than one projection for the same facet, then that facet’s storage format will be a structure (which type of structure exactly depends on other factors that are not relevant here).
The above query will make target-topic
’s value storage format a structure (e.g. AVRO
or JSON
) with two fields named: result1
and field2
. The storage format for target-topic
’s key will be the same as the input-topic
’s, as there are no projections targeting that facet.
The storage format of the output can be explicitly changed by a projection, however. This will often be the case when a projection on a whole facet is used. Consider the following query:
In this case, target-topic
’s value storage format will still be a structure, but its key will depend on field2
’s schema. For example, if field2
is a string, then target-topic
’s key will be changed to STRING
(assuming it was not STRING
already). The same behavior applies to the _value_
facet.
One example where this can be relevant is when a projection is used to map the result of a single field in the target topic. Consider the following query:
This query will project field1
on the whole value facet, and this will result in a change of storage format as well. This behaviour is quite common when a single projection is used, because more often than not the desired output in such a scenario will be the content of the field rather than a structure with a single field.
Offset | Key | Value |
---|---|---|
Offset | Key | Value |
---|---|---|
Key | Value |
---|---|
Offset | Key | Value |
---|---|---|
_key | id | name |
---|---|---|
_key | customer_id | item |
---|---|---|
name | item |
---|---|
name | item |
---|---|
name | item |
---|---|
name | item |
---|---|
arrival | id | name |
---|---|---|
arrival | item | customer_id |
---|---|---|
name | item |
---|---|
arrival | id | name |
---|---|---|
arrival | item | customer_id |
---|---|---|
name | item |
---|---|
arrival | id | name |
---|---|---|
name | item |
---|---|
Left | Right | Allowed types | Window | Result |
---|---|---|---|---|
emmited | processed | id | name |
---|---|---|---|
arrival | processed | item | customer_id |
---|---|---|---|
Duration | Description | Example |
---|
_key | meter_id | readings |
---|---|---|
_key | meter_id | reading |
---|---|---|
_key | meter_id | nested_readings |
---|---|---|
_key | meter_id | readings_day | readings_night |
---|---|---|---|
_key | meter_id | reading |
---|---|---|
a
1
[1, 2]
b
2
[3, 4, 5]
c
3
[6]
a
1
[1, 2]
1
a
1
[1, 2]
2
b
2
[3, 4, 5]
3
b
2
[3, 4, 5]
4
b
2
[3, 4, 5]
5
c
3
[6]
6
a
1
[100, 80, 95, 91]
b
2
[87, 93, 100]
c
1
[88, 89, 92, 94]
d
2
[81]
a
1
100
a
1
95
a
1
91
b
2
93
c
1
92
c
1
94
a
1
[[100, 80], [95, 91]]
b
2
[[87], [93, 100]]
c
1
[[88, 89], [92, 94]]
d
2
[[81]]
a
1
[100, 80]
[95, 91]
b
2
[87, 93]
[100]
c
1
[88]
[89, 92, 94]
d
2
[81]
[]
a
1
100
a
1
95
a
1
91
b
2
93
c
1
92
c
1
94
0
billy
{points: 50, country: “uk”}
1
billy
{points: 90, country: “uk”}
2
willy
{points: 70, country: “uk”}
3
noel
{points: 82, country: “uk”}
4
john
{points: 50, country: “usa”}
5
dave
{points: 30, country: “usa”}
6
billy
{points: 90, country: “spain”}
3
uk
{total_points: 292, average_points: 73}
5
usa
{total_points: 80, average_points: 40}
6
spain
{total_points: 90, average_points: 90}
uk
{total_points: 152, average_points: 76}
usa
{total_points: 80, average_points: 40}
spain
{total_points: 90, average_points: 90}
3
uk
{total_points: 292, average_points: 73, max_points: 90}
5
usa
{total_points: 80, average_points: 40, max_points: 50}
6
spain
{total_points: 90, average_points: 90, max_points: 90}
1
1
John
2
2
Frank
1
1
Computer
2
1
Mouse
3
3
Keyboard
John
Computer
John
Mouse
John
Computer
John
Mouse
null
Keyboard
John
Computer
John
Mouse
null
Keyboard
John
Computer
John
Mouse
null
Keyboard
Frank
null
t = 6s
1
Frank
t = 20s
2
John
t = 10s
Computer
1
t = 11s
Keyboard
2
Frank
Computer
null
Keyboard
t = 0s
1
Frank
t = 10s
1
John
t = 0s
Computer
1
t = 1s
Keyboard
2
Frank
Computer
null
Keyboard
t = 10s
2
John
Frank
Computer
null
Keyboard
John
Keyboard
Stream
Stream
All
Required
Stream
Table
Table
All
No
Table
Table
Stream
RIGHT JOIN
No
Stream
Stream
Table
INNER, LEFT JOIN
No
Stream
t = 0s
t = 20s
1
Frank
t = 0s
t = 10s
Computer
1
ms | time in milliseconds. |
|
s | time in seconds. |
|
m | time in minutes. |
|
h | time in hours. |
|
This page describes the storage formats of data in Kaka supported by Lenses SQL Processors.
The output storage format depends on the sources. For example, if the incoming data is stored as JSON, then the output will be JSON as well. The same applies when Avro is involved.
When using a custom storage format, the output will be JSON.
At times, it is required to control the resulting Key and/or Value storage. If the input is JSON, for example, the output for the streaming computation can be set to Avro.
Another scenario involves Avro source(-s), and a result which projects the Key as a primitive type. Rather than using the Avro storage format to store the primitive, it might be required to use the actual primitive format.
Controlling the storage format can be done using the following syntax:
There is no requirement to always set both the Key and the Value. Maybe only the Key or maybe only the Value needs to be changed. For example:
Considering a scenario where the input data is stored as Avro, and there is an aggregation on a field which yields an INT, using the primitive INT storage and not the Avro INT storage set the Key format to INT:
Here is an example of the scenario of having Json input source(-s), but an Avro stored output:
Changing the storage format is guarded by a set of rules. The following table describes how storage formats can be converted for the output.
Time windowed formats follow similar rules to the ones described above with the additional constraint that Session Windows(SW) cannot be converted into Time Windows (TW) nor vice-versa.
Example: Changing the storage format from TWAvro to TWJson is possible since they’re both TW formats and Avro can be converted to JSON.
Example: Changing the storage format from TWString to TWJson is not possible since, even though they’re both TW formats, String formats can’t be written as JSON.
XML as well as any custom formats are only supported as an input format. Lenses will, by default translate and process these formats by translating them to JSON and writing them as such (AVRO is also supported if a store is explicitly set).
This page describes using settings in Lenses SQL Processors to process data in Kafka.
The SET syntax allows customizing the behaviour for the underlying Kafka Consumer/Producer, Kafka Streams (including RocksDB parameters), topic creation and error handling.
The general syntax is:
SQL processors can create topics that are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic-related) to allow distinct setups for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.
To set the defaults follow this syntax:
Key | Type | Description |
---|---|---|
All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:
The streaming engine allows users to define how errors are handled when writing to or reading from a topic.
Both sides can be set at once by doing:
or individually as described in the sections below.
Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:
While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:
There are three possible values to control the behaviour.
When dlq
is used this setting is required. The value is the target topic where the problematic records will be sent to.
Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.
\
Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.
Some of the configurations for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.
Stateful data flow applications might require, on rare occasions, some of the parameters for the underlying RocksDB to be tweaked.
To set the properties, use:
From \ To | INT | LONG | STRING | JSON | AVRO | XML | Custom/Protobuf |
---|---|---|---|---|---|---|---|
From \ To | SW[B] | TW[B] |
---|---|---|
Value | Description |
---|---|
Key | Type | Description |
---|---|---|
Key | Type | Description |
---|---|---|
autocreate
BOOLEAN
Creates the topic if it does not exist already.
partitions
INT
Controls the target topic partitions count. If the topic already exists, this will not be applied.
replication
INT
Controls the topic replication factor. If the topic already exists, this will not be applied.
-
Each Kafka topics allows a set of parameters to be set. For example cleanup.policy
can be set like this SET defaults.topic.cleanup.policy='compact,delete';
key.avro.record
STRING
Controls the output record Key schema name.
key.avro.namespace
STRING
Controls the output record Key schema namespace.
value.avro.record
STRING
Controls the output record Key schema name.
value.avro.namespace
STRING
Controls the output record Key schema namespace.
continue
Allows the application to carry on. The problem will be logged.
fail
Stops the application. The application will be in a failed (error) state.
dlq
Allows the application to continue but it will send the payload to a dead-letter-topic. It requires dead.letter.queue
to be set. The default value for dead.letter.queue
is lenses.sql.dlq
.
processing.guarantee
STRING
The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production.
commit.interval.ms
LONG
The frequency with which to save the position of the processor. If processing.guarantee
is set to EXACTLY_ONCE
, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behavior of Tables, as it controls how often they will emit events downstream. An event will be emitted only every commit.interval.ms
, so every intermediate event that is received by the table will not be visible downstream directly.
poll.ms
LONG
The amount of time in milliseconds to block waiting for input.
cache.max.bytes.buffering
LONG
Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024.
client.id
STRING
An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer
num.standby.replicas
INT
The number of standby replicas for each task. Default value is 0.
num.stream.threads
INT
The number of threads to execute stream processing. Default values is 1.
max.task.idle.ms
LONG
Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.
buffered.records.per.partition
INT
Maximum number of records to buffer per partition. Default is 1000.
buffered.records.per.partition
INT
Maximum number of records to buffer per partition. Default is 1000.
connections.max.idle.ms
LONG
Close idle connections after the number of milliseconds specified by this config.
receive.buffer.bytes
LONG
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
reconnect.backoff.ms
LONG
The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
reconnect.backoff.max.ms
LONG
The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000.
retries
INT
Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0
retry.backoff.ms
LONG
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100.
send.buffer.bytes
LONG
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024.
state.cleanup.delay.ms
LONG
The amount of time in milliseconds to wait before deleting state when a partition has migrated.
rocksdb.table.block.cache.size
LONG
Set the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M
rocksdb.table.block.size
LONG
Approximate size of user data packed per lock. Default: 4K
rocksdb.table.block.cache.compressed.num.shard.bits
INT
Controls the number of shards for the block compressed cache
rocksdb.table.block.cache.num.shard.bits
INT
Controls the number of shards for the block cache
rocksdb.table.block.cache.compressed.size
LONG
Size of compressed block cache. If 0,then block_cache_compressed is set to null
rocksdb.table.block.restart.interval
INT
Set block restart interval
rocksdb.table.block.cache.size.and.filter
BOOL
Indicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization
rocksdb.table.block.checksum.type
STRING
Sets the checksum type to be used with this table. Available values: kNoChecksum
, kCRC32c
, kxxHash
.
rocksdb.table.block.hash.allow.collision
BOOL
Influence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption)
rocksdb.table.block.index.type
STRING
Sets the index type to used with this table. Available values: kBinarySearch
, kHashSearch
rocksdb.table.block.no.cache
BOOL
Disable block cache. If this is set to true, then no block cache should be used. Default: false
rocksdb.table.block.whole.key.filtering
BOOL
If true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true
rocksdb.table.block.pinl0.filter
BOOL
Indicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false.
rocksdb.total.threads
INT
The max threads RocksDB should use
rocksdb.write.buffer.size
LONG
Sets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file
rocksdb.table.block.size.deviation
INT
This is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10.
rocksdb.compaction.style
STRING
Available values: LEVEL
, UNIVERSAL
, FIFO
rocksdb.max.write.buffer
INT
rocksdb.base.background.compaction
INT
rocksdb.background.compaction.max
INT
rocksdb.subcompaction.max
INT
rocksdb.background.flushes.max
INT
rocksdb.log.file.max
LONG
rocksdb.log.fle.roll.time
LONG
rocksdb.compaction.auto
BOOL
rocksdb.compaction.level.max
INT
rocksdb.files.opened.max
INT
rocksdb.wal.ttl
LONG
rocksdb.wal.size.limit
LONG
rocksdb.memtable.concurrent.write
BOOL
rocksdb.os.buffer
BOOL
rocksdb.data.sync
BOOL
rocksdb.fsync
BOOL
rocksdb.log.dir
STRING
rocksdb.wal.dir
STRING
INT
=
yes
yes
no
yes
no
no
LONG
no
=
yes
no
yes
no
no
STRING
no
no
=
no
yes
no
no
JSON
If the Json storage contains integer only
If the Json storage contains integer or long only
yes
=
yes
no
no
AVRO
If Avro storage contains integer only
If the Avro storage contains integer or long only
yes
yes
=
no
no
XML
no
no
no
yes
yes
no
no
Custom (includes Protobuf)
no
no
no
yes
yes
no
no
SW[A]
yes if format A is compatible with format B
no
TW[A]
no
yes if format A is compatible with format B