Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes how to use functions in Lenses SQL Processors.
This section describes how to use AGGREGATE functions in Lenses SQL.
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes how to use ARRAY functions in Lenses SQL Processors.
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes how to use HEADER functions in Lenses SQL Processors.
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes the JSON functions in Lenses SQL.
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
A declarative SQL interface, for querying, transforming and manipulating data at rest and data in motion. It works with Apache Kafka topics and other data sources. It helps developers and Kafka users
The Lenses SQL Snapshot engine accesses the data at the point in time the query is executed. This means, that for Apache Kafka, data added just after the query was initiated will not be processed.
Typical use cases are but are not limited to:
Identifying a specific message.
Identifying a particular transaction of payment that your system has processed
Identifying all thermostats readings for a specific customer if you are working for an energy provider
Counting transactions processed within a given time window.
The Snapshot engine presents a familiar SQL interface, but remember that it queries Kafka with no indexes. Use Kafka's metadata (partition, offset, timestamp) to improve query performance.
Go to Workspace->Sql Studio, enter your query, and click run.
This page describes common filtering of data in Kafka with Lenses SQL Studio.
WHERE
clause allows you to define a set of logical predicates the data needs to match in order to be returned. Standard comparison operators are supported (>
, >=
, <
, <=
, =
, and !=
) as well as calling functions.
We are going to use the groceries
table created earlier. Select all items purchased where the prices are greater or equal to 2.00:
Select all customers whose last name length equals to 5:
Search all customers containing Ana
in their first name:
Keep in mind that text search is case-sensitive. To use case insensitive text search, you can write:
Sometimes data can contain explicit NULL values, or it can omit fields entirely. Using IS [ NOT ] NULL, or EXISTS functions allows you to check for these situations.
Exists
is a keyword in Lenses SQL grammar so it needs to be escaped, the escape character is `````.
Lenses supports JSON. JSON does not enforce a schema allowing you to insert null values.
Create the following table named customers_json
:
Query this table for all its entries:
The middle_name
is only present on the mikejones
record.
Write a query which filters out records where middle_name
is not present:
This can also be written as:
When a field is actually NULL
or is missing, checking like in the above query has the same outcome.
You can use AND/OR to specify complex conditions for filtering your data.
To filter the purchased items where more than one item has been bought for a given product, and the unit price is greater than 2:
Now try changing the AND logical operand to OR and see the differences in output.
To filter the entries returned from a grouping query. As with the WHERE
statement, you can use HAVING
syntax to achieve the same result when it comes to grouped queries.
To select data from a specific partition access the metadata of the topic.
In the following example, a table is created with three partitions and the message key is hashed and then the remainder HashValue % partitions
will be the table partition the record is sent to.
Next, run the following query:
As you can see from the results (your timestamps will be different) the records span over the three partitions. Now query specific partitions:
Kafka reads are non-deterministic over multiple partitions. The Snapshot engine may reach its max.size
before it finds your record in one run, next time it might.
If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.
If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.
The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.Time-traveling
The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.
This page describes the best practices when using Lenses SQL Studio to query data in Kafka.
Does Apache Kafka have indexing?
No. Apache Kafka does not have the full indexing capabilities in the payload (indexes typically come at a high cost even on an RDBMS / DB or a system like Elastic Search), however, Kafka indexes the metadata.
The only filters Kafka supports are topic, partition and offsets or timestamps.
When querying Kafka topic data with SQL such as
a full scan will be executed, and the query processes the entire data on that topic to identify all records that match the transaction id.
If the Kafka topic contains a billion 50KB messages - that would require querying 50 GB of data. Depending on your network capabilities, brokers’ performance, any quotas on your account, and other parameters, fetching 50 GB of data could take some time! Even more, if the data is compressed. In the last case, the client has to decompress it before parsing the raw bytes to translate into a structure to which the query can be applied.
When Lenses can’t read (deserialize) your topic’s messages, it classifies them as “bad records”. This happens for one of the following reasons:
Kafka records are corrupted. On an AVRO topic, a rogue producer might have published a different format
Lenses topic settings do not match the payload data. Maybe a topic was incorrectly given AVRO format when it’s JSON or vice versa
If AVRO payload is involved, maybe the Schema Registry is down or not accessible from the machine running Lenses
By default, Lenses skips them and displays the records’ metadata in the Bad Records tab. If you want to force stop the query in such case use:
Querying a table can take a long time if it contains a lot of records. The underlying Kafka topic has to be read, the filter conditions applied, and the projections made.
Additionally, the SELECT
statement could end up bringing a large amount of data to the client. To be able to constrain the resources involved, Lenses allows for context customization, which ends up driving the execution, thus giving control to the user. Here is the list of context parameters to overwrite:
All the above values can be given a default value via the configuration file. Using lenses.sql.settings
as prefix the format.timestamp
can be set like this:
Lenses SQL uses Kafka Consumer to read the data. This means that an advanced user with knowledge of Kafka could tweak the consumer properties to achieve better throughput. This would occur on very rare occasions. The query context can receive Kafka consumer settings. For example, the max.poll.records
consumer can be set as:
The fact is that streaming SQL is operating on unbounded streams of events: a query would normally be a never-ending query. In order to bring query termination semantics into Apache Kafka we introduced 4 controls:
LIMIT = 10000 - Force the query to terminate when 10,000 records are matched.
max.bytes = 20000000 - Force the query to terminate once 20 MBytes have been retrieved.
max.time = 60000 - Force the query to terminate after 60 seconds.
max.zero.polls = 8 - Force the query to terminate after 8 consecutive polls are empty, indicating we have exhausted a topic.
Thus, when retrieving data, you can set a limit of 1GB to the maximum number of bytes retrieved and a maximum query time of one hour like this:
Name | Description | Example |
---|---|---|
max.size
The maximum amount of Kafka data to scan. This is to avoid full topic scan over large topics. It can be expressed as bytes (1024), as kilo-bytes (1024k), as mega-bytes (10m) or as giga-bytes (5g). Default is 20MB.
SET max.size = '1g';
max.query.time
The maximum amount of time the query is allowed to run. It can be specified as milliseconds (2000ms), as hours (2h), minutes (10m) or seconds (60s). Default is 1 hour.
SET max.query.time = '60000ms';
max.idle.time
The amount of time to wait when no more records are read from the source before the query is completed. Default is 5 seconds
SET max.idle.time = '5s';
LIMIT N
The maximum of records to return. Default is 10000
SELECT * FROM payments LIMIT 100;
show.bad.records
Flag to drive the behavior of handling topic records when their payload does not correspond with the table storage format. Default is true. This means bad records are processed, and displayed seperately in the Bad Records section. Set it to false to fail to skip them completely.
SET show.bad.records=false;
format.timestamp
Flag to control the values for Avro date time. Avro encodes date time via Long values. Set the value to true if you want the values to be returned as text and in a human readable format.
SET format.timestamp=true;
format.decimal
Flag to control the formatting of decimal types. Use to specify how many decimal places are shown.
SET format.decimal= 2;
format.uppercase
Flag to control the formatting of string types. Use to specify if strings should all be made uppercase. Default is false.
SET format.decimal= 2;
live.aggs
Flag to control if aggregation queries should be allowed to run. Since they accumulate data they require more memory to retain the state.
SET live.aggs=true;
max.group.records
When an aggregation is calculated, this config is used to define the maximum number of records over which the engine is computed. Default is 10 000 000
SET max.group.records=10000000
optimize.kafka.partition
When enabled, it will use the primitive used for the _key filter to determine the partition the same way the default Kafka partitioner logic does. Therefore, queries like SELECT * FROM trips WHERE _key='customer_id_value';
on multiple partition topics will only read one partition as opposed to the entire topic. To disable it, set the flag to false.
SET optimize.kafka.partition=false;
query.parallel
When used, it will parallelize the query. The number provided will be capped by the target topic partitions count.
SET query.parallel=2;
query.buffer
Internal buffer when processing messages. Higher number might yield better performance when coupled with max.poll.records
.
SET query.buffer=50000;
kafka.offset.timeout
Timeout for retrieving target topic start/end offsets.
SET kafka.offset.timeout=20000;
This page describes the concepts of the Lenses SQL snapshot engine that drives the SQL Studio allowing you to query data in Kafka.
Escape topic names with backticks if they contain non-alpha numeric characters
Snapshot queries on streaming data provide answers to a direct question, e.g. The current balance is $10. The query is active, the data is passive.
A single entry in a Kafka topic is called a message.
The engine considers a message to have four distinct components key
, value
, headers
and metadata
.
Currently, the Snapshot Engine supports four different facets _key
, _value
, _headers
and _metadata
; These strings can be used to reference properties of each of the aforementioned message components and build a query that way.
By default, unqualified properties are assumed to belong to the _value
facet:
In order to reference a different facet, a facet qualifier can be added:
When more than one sources/topics are specified in a query (like it happens when two topics are joined) a table reference can be added to the selection to fix the ambiguity:
the same can be done for any of the other facets (_key
,_meta
,_headers
).
Note Using a wildcard selection statement SELECT * provides only the value component of a message.
Headers are interpreted as a simple mapping of strings to strings. This means that if a header is a JSON, XML or any other structured type, the snapshot engine will still read it as a string value.
Messages can contain nested elements and embedded arrays. The .
operator is used to refer to children, and the []
operator is used for referring to an element in an array.
You can use a combination of these two operators to access data of any depth.
You explicitly reference the key, value and metadata.
For the key use _key
, for the value use _value
, and for metadata use _meta
. When there is no prefix, the engine will resolve the field(s) as being part of the message value. For example, the following two queries are identical:
When the key or a value content is a primitive data type use the prefix only to address them.
For example, if messages contain a device identifier as the key and the temperature as the value, SQL code would be:
Use the _meta
keyword to address the metadata. For example:
When projecting a field into a target record, Lenses allows complex structures to be built. This can be done by using a nested alias like below:
The result would be a struct with the following shape:
When two alias names clash, the snapshot engine does not “override” that field. Lenses will instead generate a new name by appending a unique integer. This means that a query like the following:
will generate a structure like the following:
The tabled query allows you to nest queries. Let us take the query in the previous section and say we are only interested in those entries where there exist more than 1 customer per country.
Run the query, and you will only see those entries for which there is more than one person registered per country.
Functions can be used directly.
For example, the ROUND
function allows you to round numeric functions:
For a full list of functions see SQL Reference.
This page describes how to limit return and sample data in Kafka with Lenses SQL Studio.
To limit the output of the query you can use two approaches:
use the LIMIT
clause
set the max size of the data to be returned
To restrict the time to run the query, use SET max.query.time
:
To sample data and discard the first rows:
This statement instructs Lenses to skip the first record matched and then sample the next two.
This page describes how to insert and delete data into Kafka with Lenses SQL Studio.
Lenses SQL allows you to utilize the ANSI SQL command to store new records into a table.
Single or multi-record inserts are supported:
$Table - The name of the table to insert the data into
Columns - The target columns to populate with data. Adding a record does not require you to fill all the available columns. In the case of Avro stored Key, Value pairs, the user needs to make sure that a value is specified for all the required Avro fields.
VALUES - The set of value to insert. It has to match the list of columns provided, including their data types. You can use simple constants or more complex expressions as values, like 1 + 1
or NOW()
.
Example:
Records can be inserted from the result of SELECT statement.
The syntax is:
For example, to copy all the records from the customer table into customer_avro one:
There are scenarios where a record key is a complex type. Regardless of the storage format, JSON or Avro, the SQL engine allows the insertion of such entries:
There are two ways to delete data:
If the topic is not compacted, then DELETE
expects an offset to delete records up to.
If the topic is compacted, then DELETE
expects the record Key
to be provided. For a compacted topic a delete translates to inserting a record with the existing Key, but the Value is null. For the customer_avro
topic (which has the compacted flag on), a delete operation for a specific customer identifier would look like this:
Deleting is an insert operation. Until the compaction takes place, there will be at least one record with the Key used earlier. The latest (or last) record will have the Value set to null.
To remove all records from a table:
where the $Table
is the table name to delete all records from. This operation is only supported on non-compacted topics, which is a Kafka design restriction. To remove the data from a compacted topic, you have two options: either dropping and recreating the topic or inserting null Value records for each unique Key on the topic.
After rebuilding the customer
table to be non-compacted, perform the truncate:
Truncating a compacted Kafka topic is not supported. This is an Apache Kafka restriction. You can drop and recreate the table, or insert a record with a null Value for each unique key in the topic.
This page describes how to create and delete topics in the Lenses SQL Studio.
Lenses supports the typical SQL commands supported by a relational database:
CREATE
DROP
TRUNCATE
DELETE
SHOW TABLES
DESCRIBE TABLE
DESCRIBE FORMATTED
The CREATE
statement has the following parts:
CREATE TABLE - Instructs the construction of a table
$Table - The actual name given to the table.
Schema - Constructed as a list of (field, type) tuple, it describes the data each record in the table contains
FORMAT - Defines the storage format. Since it is an Apache Kafka topic, both the Key and the Value formats are required. Valid values are STRING, INT, LONG, JSON, AVRO.
PROPERTIES - Specifies the number of partitions the final Kafka topic should have, the replication factor in order to ensure high availability (it cannot be a number higher than the current Kafka Brokers number) and if the topic should be compacted.
A Kafka topic which is compacted is a special type of topic with a finer-grained retention mechanism that retains the last update record for each key.
A compacted topic (once the compaction has been completed) contains a full snapshot of the final record values for every record key and not just the recently changed keys. They are useful for in-memory services, persistent data stores, reloading caches, etc.
For more details on the subject, you should look at Kafka Documentation.
Example:
Best practices dictate to use Avro as a storage format over other formats. In this case, the key can still be stored as STRING but the value can be Avro.
To list all tables:
To examine the schema an metadata for a topic:
The $tableName
should contain the name of the table to describe.
Given the two tables created earlier, a user can run the following SQL to get the information on each table:
the following information will be displayed:
To drop a table:
Dropping a table results in the underlying Kafka topics being removed.
Lenses provides a set of virtual tables that contain information about all the fields in all the tables.
Using the virtual table, you can quickly search for a table name but also see the table type.
The __table
has a table_name
column containing the table name, and a table_type
column describing the table type (system, user, etc).
To see all the tables fields select from the _fields
virtual table.
Each Kafka message contains information related to partition, offset, timestamp, and topic. Additionally, the engine adds the key and value raw byte size.
Create a topic and insert a few entries.
Now we can query for specific metadata related to the records.
To query for metadata such as the underlying Kafka topic offset, partition and timestamp prefix your desired fields with _meta
.
Run the following query to see each tutorial name along with its metadata information:
This page describes how to aggregate Kafka data in Lenses SQL Studio.
For a full list of aggregation functions see the SQL Reference.
Using the COUNT
aggregate function you can count the records in a table. Run the following SQL to see how many records we have on the customers_partitioned
:
Using the SUM
function you can sum records in a table.
To group data use the GROUP BY
clause:
Let’s see how many customers there are from each country. Here is the code which computes that:
In this guide you will learn about SQL Processor apps in Lenses, how to use them to quickly create streaming flows using SQL streaming, deploy and scale them.
SQL Processors continuously process data in Kafka topics. Under the hood, the Kafka Streams APIs are combined with the internal Lenses application deployment framework. A user gets a seamless experience for creating scalable processing components to filter, aggregate, join, and transform Kafka topics, which can scale natively to Kubernetes.
SQL Processors are continuous queries, they process data as it arrives and react to events, e.g. payment made, $5 to Coffeshop. The data is active, the query is passive.
SQL Processors offer:
A no-code, stand-alone application executing a given Lenses SQL query on current and future data
Query graph visualisation
Fully integrated experience within Lenses
ACLs and Monitoring functionality out-of-the-box
Ability to scale up and down workflows via Kubernetes
SQL Processors are long-lived applications that continuously process data. In use with Kubernetes mode (recommended), they are deployed via Lenses, as Kubernetes deployments separate from the Lenses instance.
To create SQL Processors go to Workspace->Apps->New App->SQL Processor.
Enter a name
Enter your SQL statement, the editor will help you with IntelliSense
Optionally specify a description, tag and ProcessorID (consumer group ID)
Select the deployment target, Kubernetes this will be a Kubernetes cluster and namespace.
If the configuration is valid the SQL Processor will be created. You can then click Start to deploy. Processors are not started automatically, click Start
in the Actions menu to start the Processor.
To start a Processor, select Start
from the Action menu.
Select the SQL processor. Lenses will show an overview of the health of the processor.
Selecting a Runner will display further information about each individual runner.
The metadata in the Summary
tab also contains the consumer group ID (Processor ID).
The SQL
tab shows the SQL Streaming statement the processor is running. A visual representation is shown in the Data Flow
tab.
The Configuration tab shows the low-level settings.
To scale a processor, Select Scale
from the Actions menu, and enter the desired amount of runners. In Kubernetes mode, the runners are pods.
Select Stop
in the Actions Menu.
Lenses provides helpful snippets for common scenarios. Select the snippet from the Help section.
ProcessorID is the public unique identifier for an SQL processor. It is customizable, meaning that you, as a user, have control over it and can set this identifier to any arbitrary string.
Restrictions on custom ProcessorIDs:
They have to be unique across all processors
Match the following regex: ^[a-zA-Z0-9\-\._]+
:
Only letters, capital letters, numbers and -
, _
& -
are allowed.
It has to start with a letter or a number.
It cannot be empty.
One important aspect of the ProcessorID is that it is used as the Kafka consumer group identifier. That means that, in practice, this is the value that allows an SQL processor to build its consumer group and coordinate record ingestion from Kafka between all Processor replicas. Consequently, if the ProcessorID of a given SQL processor is changed, that processor will restart consuming messages from the beginning of the existing records in the topic.
The ApplicationID is the Lenses unique identifier, is automatically created by Lenses, and cannot be customized.
This is unique among all applications types; it does not matter if it’s an SQL processor or a different (new) sort of future application.
Lenses uses the ApplicationID to manage applications. This means that, when Starting, Stopping or Scaling an application, Lenses will use this attribute to pick the right instance.
This page describes how to use views and synonyms in Lenses SQL Studio to query Kafka.
Lenses supports the typical SQL commands supported by a relational database:
CREATE
DROP
TRUNCATE
DELETE
SHOW VIEWS
A view
is a virtual table, generated dynamically based on the results of a SELECT
statement.
A view looks and acts just like a real table, but is always created on the fly as required, so it is always up to date.
A synonym
is an alias for a table. This is useful if you have a topic with a long, unwieldy name like customer_reports_emea_april_2018_to_march_2018 and you want to access this as customer_reports.
To create a view:
Where viewname
is the name of the virtual table that is used to access the records in the view, and the query is a standard SELECT
statement.
Then we can query the view:
A view acts as a virtual table. This means that a view can be filtered even more or that a projection can be applied to a view:
To delete a view:
If you wish to modify an existing view, use the syntax above to delete it, and then create a new view with the same name.
To see a definition of a view. You can use the following syntax:
To create a synonym:
To delete a synonym:
If you wish to modify an existing synonym, use the syntax above to delete it, and then create a new synonym with the same name.
Three common reasons for using a view are:
creating a projection from a table with a large number of fields
representing joins as a single table
and creating a preset filter
We will cover each scenario with an example.
If we have a table called customers which contains full customer records - name, email, age, registration date, country, password, and many others – and we find ourselves repeatedly querying it for just name and email.
A view could be created that returns just the name and email as a projection.
There is no reason to specify the projection each time.
The benefit is more significant when we want to select a higher number of fields - say a topic with 50 fields, and we want to select only 15.
The statement that is used to generate the view can consist of one or more tables. One use case of views is to represent joined tables as if they were a single table. This avoids the need for writing a complex join query each time.
Then we can select from this join like this:
Finally, another use case is to define a filter that is commonly used. If a topic contains transactions, and we often found ourselves searching for transactions from the UK. We could run this query each time:
Alternatively, we can set up a view with this filter pre-applied:
Then use a SELECT
query:
This page describes how to manage and control queries against Kafka in Lenses SQL Studio.
Adding a LIMIT 10 in the SQL query will result in the SQL terminating early, as soon as 10 x messages have been discovered. It’s not a perfect solution as we might never find 10 x messages, and thus perform a full scan.
You can also set a maximum query or idle time:
or max idle time, the idea is that there is no reason to keep polling if we have exhausted the entire topic:
or a maximum amount of data to read from Kafka. This controls how much data to read from Kafka NOT the required memory.
Recent queries are displayed, but only for the current session, they are not currently retained.
Click on the play button to run a previous query. If a query is already running, you will be asked if you want to stop it first.
View All queries
View Running queries
You can see all running queries by Lenses users using SQL:
You can force stop a query by another user using SQL:
This page describes examples of using arrays in Lenses SQL Studio to query Kafka.
For a full list of array functions see the SQL Reference.
You can create array fields using the ..[]
syntax:
Tables can store data containing arrays. Here is a SQL statement for querying an array item:
When working with arrays is good to check the array bounds. See the SIZEOF
function in the list of supported functions.
Sometimes you want to find out how many items are in your array. To do so you can run:
This page describes joining data in Kafka with Lenses SQL Studio.
Lenses allows you to combine records from two tables. A query can contain zero, one or multiple JOIN operations.
Create an orders
table and insert some data into it:
With these tables in place, join them to get more information about an order by combining it with the customer information found in the customer
table:
With lateral joins, Lenses allows you to combine records from a table with the elements of an array expression.
We are going to see in more detail what lateral joins are with an example.
Create a batched_readings
table and insert some data into it:
You now can use a LATERAL join to inspect, extract and filter the single elements of the readings
array, as if they were a normal field:
Running that query we will get the values:
You can use multiple LATERAL
joins, one inside the other, if you want to extract elements from a nested array:
Running the following query we will obtain the same records of the previous example:
This page describes how to join data in Kafka with in Lenses SQL Processors.
Joins allow rows from different sources to be combined.
Lenses allows two sources of data to be combined based either on the equality of their _key facet or using a user-provided expression.
A query using joins looks like a regular query apart from the definition of its source and in some cases the need to specify a window expression:
projection: the projection of a join expression differs very little from a regular projection. The only important consideration is that since data is selected from two sources, some fields may be common to both. The syntax table.field
is recommended to avoid this type of problem.
sourceA/sourceB : the two sources of data to combine.
window: only used if two streams are joined. Specifies the interval of time to search matching results.
joinExpression: a boolean expression that specifies how the combination of the two sources is calculated.
filterExpression: a filter expression specifying which records should be filtered.
When two sources of data are combined it is possible to control which records to keep when a match is not found:
Disclaimer: The following examples do not take into consideration windowing and/or table materialization concerns.
Customers
Orders
This join type will only emit records where a match has occurred.
(Notice there’s no item with customer.id = 2
nor is there a customer with id = 3
so these two rows are not present in the result).
This join type selects all the records from the left side of the join regardless of a match:
(Notice all the rows from orders are present but since no customer.id = 3
no name can be set.)
A right join can be seen as a mirror of a LEFT JOIN. It selects all the records from the right side of the join regardless of a match:
An outer join can be seen as the union of left and right joins. It selects all records from the left and right side of the join regardless of a match happening:
By default, if no ON expression is provided, the join will be evaluated based on the equality of the _key facet. This means that the following queries are equivalent:
When an expression is provided, however, there are limitations regarding what kind of expressions can be evaluated.
Currently, the following expression types are supported:
Equality expressions using equality (=) with one table on each side:
customers.id = order.user_id
customers.id - 1 = order.user_id - 1
substr(customers.name, 5) = order.item
Any boolean expression which references only one table:
len(customers.name) > 10
substr(customer.name,1) = "J"
len(customer.name) > 10 OR customer_key > 1
Allowed expressions mixed together using an AND operator:
customers._key = order.user_id AND len(customers.name) > 10
len(customers.name) > 10 AND substr(customer.name,1) = "J"
substr(customers.name, 5) = order.item AND len(customer.name) > 10 OR customer_key > 1
Any expressions not following the rules above will be rejected:
More than one table is referenced on each side of the equality operator
concat(customer.name, item.name) = "John"
customer._key - order.customer_id = 0
a boolean expression not separated by an AND references more than one table:
customer._key = 1 OR customer._key = order.customer_id
When two streams are joined Lenses needs to know how far away in the past and in the future to look for a matching record.
This approach is called a “Sliding Window” and works like this:
Customers
Purchases
At t=10
, when both the Computer and the Keyboard records arrive, only one customer can be found within the given time window (the specified window is 5s thus the window will be [10-5,10+5]s ).
This means that the following would be the result of running the query:
Note: John will not match the Keyboard purchase since t=20s is not within the window interval [10-5,10+5]s.
When streaming data, records can be produced at different rates and even out of order. This means that often a match may not be found because a record hasn’t arrived yet.
The following example shows an example of a join between a stream and a table where the arrival of the purchase information is made available before the customers’ information is.
(Notice that the purchase of a “Keyboard” by customer_id = 2
is produced before the record with the customer details is.)
Customers
Purchases
Running the following query:
would result in the following:
If later, the record for customer_id = 2
is available:
a record would be emitted with the result now looking like the following:
Notice that “Keyboard” appears twice, once for the situation where the data is missing and another for when the data is made available.
This scenario will happen whenever a Stream is joined with a Table using a non-inner join.
The following table shows which combinations of table/stream joins are available:
In order to evaluate a join between two sources, the key facet for both sources has to share the same initial format.
If formats are not the same the join can’t be evaluated. To address this issue, an intermediate topic can be created with the correct format using a STORE AS statement. This newly created topic can then be created as the new source.
In addition to the constraint aforementioned, when joining, it’s required that the partition number of both sources be the same.
When a mismatch is found, an additional step will be added to the join evaluation in order to guarantee an equal number of partitions between the two sources. This step will write the data from the source topic with a smaller count of partitions into an intermediate one.
This newly created topic will match the partition count of the source with the highest partition count.
In the topology view, this step will show up as a Repartition Node.
Joining two topics is only possible if the two sources used in the join share the same key shape and decoder.
When an ON statement is specified, the original key facet will have to change so that it matches the expression provided in the ON statement. Lenses will do this calculation automatically. As a result, the key schema of the result will not be the same as either one of the sources. It will be a Lenses calculated object equivalent to the join expression specified in the query.
As discussed when addressing join types, some values may have null values when non-inner joins are used.
Due to this fact, fields that may have null values will be typed as the union of null and their original type.
Within the same query, joins may only be evaluated between two sources.
When a join between more than two sources is required, multiple queries can be combined using a WITH statement:
In order to group the results of a join, one just has to provide a GROUP BY
expressions after a join expression is specified.
Purchases
When a join between a table and a join is processed, lenses will, for each stream input (orders in the example above), look for a matching record on the specified table (customers).
Notice that the record with Frank’s purchase information is processed at t = 10s
at which point Frank’s Customer information hasn’t yet been processed. This means that no match will be found for this record.
At t=20s
however, the record with Frank’s customer information is processed; this will only trigger the emission of a new record if an Outer Join is used.
There are some cases where filter expressions can help optimize a query. A filter can be broken down into multiple steps so that some can be applied before the join node is evaluated. This type of optimization will reduce the number of records going into the join node and consequentially increase its speed.
For this reason, in some cases, filters will show up before the join in the topology node.
This page describes the concepts of Lenses SQL Processors from joining, aggregating and filtering data in Kafka.
SQL Processors see data as an independent sequence of infinite events. An Event in this context is a datum of information; the smallest element of information that the underlying system uses to communicate. In Kafka’s case, this is a Kafka record/message.
Two parts of the Kafka record are relevant:
Key
Value
These are referred to as facets by the engine. These two components can hold any type of data and Kafka itself is agnostic on the actual storage format for either of these two fields. SQL Processors interpret records as (key, value) pairs, and it exposes ways to manipulate these pairs in several ways.
As mentioned above, queries that are meant to be run on streaming data are treated as stand-alone applications. These applications, in the context of the Lenses platform, are referred to as SQL Processors.
A SQL Processor encapsulates a specific Lenses SQL query, its details and everything else Lenses needs to be able to run the query continuously.
To support features like:
Inference of output schemas
Creation-time validation of input query
Selections
Expressions
Lenses SQL Engine Streaming mode needs to have up-to-date schema information for all structured topics that are used as input in a given query. In this context, structured means topics that are using complex storage formats like AVRO
or JSON
.
For the above query, for example, the purchases
topic will need to have a value set to a structured format and a valid schema will need to already have been configured in Lenses. In such schema, fields itemId
, price
and quantity
must be defined, the latter two being of a numerical type.
These requirements ensure the Engine will always be in a position to know what kind of data it will be working with, guaranteeing at the same time that all obvious errors are caught before a query is submitted.
The UI allows us to visualise any SQL Processor out of the box. For the example:
This visualisation helps to highlight that the Lenses SQL fully supports M-N topologies.
What this means is that multiple input topics can be read at the same time, their data manipulated in different ways and then the corresponding results stored in several output topics, all as part of the same Processor’s topology.
This means that all processing can be done in one go, without having to split parts of a topology into different Processors (which could result in more data being stored and shuffled by Kafka).
An expression is any part of a Lenses SQL query that can be evaluated to a concrete value (not to be confused with a record value).
In a query like the following:
CONCAT('a', 'b')
, (1 + field1)
and field2
are all expressions whose values will be _projected_
onto the output topic, whereas LENGTH(field2) > 5
is an expression whose values will be used to filter out input records.
SQL Processors are built on top of Kafka Streams, and it enriches this tool with an implementation of Lenses SQL that fits well with the architecture and design of Kafka Streams. When executed, they run a Kafka Streams instance.
Each SQL Processor has an application ID which uniquely identifies it within Lenses. The application ID is used as the Kafka Streams application ID which in turn becomes the underlying Kafka Consumer(s) group identifier.
Scaling up or down the number of runners automatically adapts and rebalances the underlying Kafka Streams application in line with the Kafka group semantics.
The advantages of using Kafka Streams as the underlying technology for SQL Processors are several:
Kafka Streams is an enterprise-ready, widely adopted and understood technology that integrates natively with Kafka
Using consumer group semantics allows leveraging Kafka’s distribution of workload, fault tolerance and replication out of the box
A stream is probably the most fundamental abstraction that SQL Processors provide, and it represents an unbounded sequence of independent events over a continuously changing dataset.
Let’s clarify the key terms in the above definition:
event: an event, as explained earlier, is a datum, that is a (key, value) pair. In Kafka, it is a record.
continuously changing dataset: the dataset is the totality of all data described by every event received so far. As such, it is changed every time a new event is received.
unbounded: this means that the number of events changing the dataset is unknown and it could even be infinite
independent: events don’t relate to each other and, in a stream, they are to be considered in isolation
The main implication of this is that stream transformations (e.g. operations that preserve the stream semantics) are stateless because the only thing they need to take into account is the single event being transformed. Most Projections fall within this category.
To illustrate the meaning of the above definition, imagine that the following two events are received by a stream:
Now, if the desired operation on this stream was to sum the values of all events with the same key (this is called an Aggregation), the result for "key1"
would be 30
, because each event is taken in isolation.
Finally, compare this behaviour with that of tables, as explained below, to get an intuition of how these two abstractions are related but different.
Lenses SQL streaming supports reading a data source (e.g. a Kafka topic) into a stream by using SELECT STREAM
.
The above example will create a stream that will emit an event for each record, including future ones.
While a stream is useful to have visibility to every change in a dataset, sometimes it is necessary to hold a snapshot of the most current state of the dataset at any given time.
This is a familiar use-case for a database and the Streaming abstraction for this is aptly called table.
For each key, a table holds the latest version received of its value, which means that upon receiving events for keys that already have an associated value, such values will be overridden.
A table is sometimes referred to as a changelog stream, to highlight the fact that each event in the stream is interpreted as an update.
Given its nature, a table is intrinsically a stateful construct, because it needs to keep track of what it has already been seen. The main implication of this is that table transformations will consequently also be stateful, which in this context means that they will require local storage and data being copied.
Additionally, tables support delete semantics. An input event with a given key and a null
value will be interpreted as a signal to delete the (key, value) pair from the table.
Finally, a table needs the key for all the input events to not be null
. To avoid issues, tables will ignore and discard input events that have a null
key.
To illustrate the above definition, imagine that the following two events are received by a table:
Now, if the desired operation on this table was to sum the values of all events with the same key (this is called an Aggregation), the result for key1
would be 20
, because (key1, 20)
is interpreted as an update.
Finally, compare this behaviour with that of streams, as explained above, to get an intuition of how these two abstractions are related but different.
Lenses SQL Streaming supports reading a data source (e.g. a Kafka topic) into a table by using SELECT TABLE
.
The above example will create a table that will treat each event on input-topic
, including future ones, as updates.
Given the semantics of tables, and the mechanics of how Kafka stores data, the Lenses SQL Streaming will set the cleanup.policy
setting of every new topic that is created from a table to compact
, unless explicitly specified otherwise.
What this means is that the data on the topic will be stored with a semantic more closely aligned to that of a table (in fact, tables in Kafka Streams use compacted topics internally). For further information regarding the implications of this, it is advisable to read the official Kafka Documentation about cleanup.policy
.
Streams and tables have significantly different semantics and use cases, but one interesting observation is that are strongly related nonetheless.
This relationship is known as stream-table duality. It is described by the fact that every stream can be interpreted as a table, and similarly, a table can be interpreted as a stream.
Stream as Table: A stream can be seen as the changelog of a table. Each event in the stream represents a state change in the table. As such, a table can always be reconstructed by replaying all events of a stream, in order.
Table as Stream: A table can be seen as a snapshot, at a point in time, of the latest value received for each key in a stream. As such, a stream can always be reconstructed by iterating over each (Key, Value) pair and emitting it as an event.
To clarify the above duality, let’s use a chess game as an example.
On the left side of the above image, a chessboard at a specific point in time during a game is shown. This can be seen as a table where the key is a given piece and the value is its position. Also, on the right-hand side, there is the list of moves that culminated in the positioning described on the left; it should be obvious that this can be seen as a stream of events.
The idea formalised by the stream-table duality is that, as it should be clear from the above picture, we can always build a table from a stream (by applying all moves in order).
It is also always possible to build a stream from a table. In the case of the chess example, a stream could be made where each element represents the current state of a single piece (e.g. w: Q h3).
This duality is very important because it is actively used by Kafka (as well as several other storage technologies), for example, to replicate data and data stores and to guarantee fault tolerance. It is also used to translate table and stream nodes within different parts of a query.
One of the main goals of SQL Processors is to ensure that it uses all the information available to it when a SQL Processor is created to catch problems, suggest improvements and prevent errors. It’s more efficient and less frustrating to have an issue coming up during registration rather than at some unpredictable moment in the future, at runtime, possibly generating corrupted data.
SQL engine will actively check the following during the registration of a processor:
Validation of all user inputs
Query lexical correctness
Query semantics correctness
Existence of the input topics used within the query
User permissions to all input and output topics
Schema alignment between fields and topics used within the query
Format alignment between data written and output topics, if the latter already exist
When all the above checks pass, the Engine will:
Generate a SQL Processor able to execute the user’s query
Generate and save valid schemas for all output topics to be created
Monitor the processor and make such metrics available to Lenses
The Engine takes a principled and opinionated approach to schemas and typing information; what this means is that, for example, where there is no schema information for a given topic, that topic’s fields will not be available to the Engine, even if they are present in the data; also, if a field in a topic is a string
, it will not be possible to use it as a number for example, without explicitly CAST
ing it.
The Engine’s approach allows it to support naming and reusing parts of a query multiple times. This can be achieved using the dedicated statement WITH
.
The WITH
s allow for whole sections of the query to be reused and manipulated independently by successive statements, and all this is done by maintaining schema and format alignment and correctness. The reason why this is useful is that it allows to specify queries that split their processing flow without having to redefine parts of the topology. This, in turn, means that less data needs to be read and written to Kafka, improving performance.
This is just an example of what SQL Processors can offer because of the design choices taken and the strict rules implemented at query registration.
This page describes the time and windowing of data in Kafka with Lenses SQL Processors.
A data stream is a sequence of events ordered by time. Each entry contains a timestamp component, which aligns it on the time axis.
Kafka provides the source for the data streams, and the Kafka message comes with the timestamp built in. This is used by the Push Engines by default. One thing to consider is that from a time perspective, the stream records can be out of order. Two Kafka records, R1 and R2 do not necessarily respect the rule: R1 timestamp is smaller than R2 timestamp.
Timestamps are required to perform time-dependent operations for streams - like aggregations and joins.
A record timestamp value can have three distinct meanings. Kafka allows to configure a topic timestamp meaning via this log.message.timestamp.type
setting. The two supported values are CreateTime
and LogAppendTime
.
When a record is created at source the producer is responsible for setting the timestamp for it. Kafka producer provides this automatically and this is aligned with the CreateTime
the configuration mentioned earlier.
At times, the data source timestamp is not available. When setting the topic timestamp type to LogAppendTime
, the Kafka broker will attach the timestamp at the moment it writes it to the topic.
The timestamp will be set to the time the record was read by the engine, ignoring any previously set timestamps.
Sometimes, when the data source is not under direct control, it might be that the record’s timestamp is actually embedded in the payload, either in the key or the value.
Lenses SQL Streaming allows to specify where to extract the timestamp from the record by using EVENTTIME BY
.
where <selection>
is a valid selection.
Here are a few examples on how to use the syntax to use the timestamp from the record value facet:
For those scenarios when the timestamp value lives within the record key, the syntax is similar:
All records produced by the Lenses SQL Streaming will have a timestamp set and its value will be one of the following:
For direct transformations, where the output record is a straightforward transformation of the input, the input record timestamp will be used.
For aggregations, the timestamp of the latest input record being aggregated will be used.
In all other scenarios, the timestamp at which the output record is generated will be used.
Some stream processing operations, like joins or aggregations, require distinct time boundaries which are called windows. For each time window, there is a start and an end, and as a result a duration. Performing aggregations over a time window, means only the records which fall within the time window boundaries are aggregated together. It might happen for the records to be out-of-order and arrive after the window end has passed, but they will be associated with the correct window.
There are three-time windows to be used at the moment: hopping, tumbling and session.
When defining a time window size, the following types are available:
These are fixed-size and overlapping windows. They are characterised by duration and the hop interval. The hop interval specifies how far a window moves forward in time relative to the previous window.
Since the windows can overlap, a record can be associated with more than one window.
Use this syntax to define a hopping window:
They are a particularisation of hopping windows, where the duration and hop interval are equal. This means that two windows can never overlap, therefore a record can only be associated with one window.
Duration time takes the same unit types as described earlier for hopping windows.
Unlike the other two window types, this window size is dynamic and driven by the data. Similar to tumbling windows, these are non-overlapping windows.
A session window is defined as a period of activity separated by a specified gap of inactivity. Any records with timestamps that occur within the boundaries of the inactivity interval are considered part of the existing sessions. When a record arrives and its timestamp is outside of the session gap, a new session window is created and the record will belong to that.
A new session window starts if the last record that arrived is further back in time than the specified inactivity gap. Additionally, different session windows might be merged into a single one if an event is received that falls in between two existing windows, and the resulting windows would then overlap.
To define a session window the following syntax should be used:
The inactivity interval can take the time unit type seen earlier for the hopping window.
Session windows are tracked on a per-key basis. This means windows for different keys will likely have different durations. Even for the same key, the window duration can vary.
User behaviour analysis is an example of when to use session windows. They allow metrics like counting user visits, customer conversion funnel or event flows.
It is quite common to see records belonging to one window arriving late, that is after the window end time has passed. To accept these records the notion of a grace period is supported. This means that if a record timestamp falls within a window W and it arrives within W + G (where G is the grace interval) then the record will be processed and the aggregations or joins will update. If, however, the record comes after the grace period then it is discarded.
To control the grace interval use this syntax:
The default grace period is 24 hours. Until the grace period elapses, the window is not actually closed.
This page describes how to explore and process data in real time with Lenses SQL engines, and use Kafka Connectors to source and sink data, with monitoring and alerting.
For automation use the CLI
.
Lenses has two SQL engines to allow users to explore and process streaming data:
SQL Snapshot - This is point in time SQL engine powering the Explore and SQL Studio for debugging and exploration of data.
SQL Processors (Streaming) - These are long-running applications, configured, deployed and managed by Lenses to perform joins, aggregations, data conversion and more.
This page describes access Kafka message metadata in Lenses SQL Studio.
When running queries against Kafka, Snapshot Engine enables you to access the record metadata through the special _meta
facet.
These are the available meta fields:
Field | Description |
---|---|
The following query will select all the meta fields listed above:
To view the value of a specific header you can run:
To read records from a specific partition, the following query can be used:
Here is the query to use when the record offset and partition are known:
This query will get the latest 100 records per partition (assuming the topic is not compacted):
This instead will get the latest 100 records for a given partition (again assuming the topic is not compacted):
This page describes using projections for data in Kafka with Lenses SQL Processors.
A projection represents the ability to project a given input value onto a target location in an output record. Projections are the main building block of SELECT
statements.
A projection is composed of several parts. Projections have a source section that allows one to select a specific value and to ensure that it will be present in the output, in the desired structure and with the desired name, as described by the target section.
In the below query:
These are projections:
CONCAT(‘a’, ‘b’) as result1
field4
(1 + field1) as _key.a
_key.field2 as result3
5 + 7 as constantField
CASE … as who_is_it
It is worth highlighting that projections themselves are stateless. While the calculation of the source value could be stateful (depending on the type of expression being evaluated, the act of making the value available in the output is not a stateful operation.
The precise syntax of a projection is:
In the above, []
indicates optional sections and |
is to be read as OR.
Source:
<expression>
: the source section must consist of a valid SQL expression, which will be evaluated to the value to be projected.
Target:
this section is optional. When missing, the output target facet will be _value
and the <alias>
will be defaulted to the string representation of the source expression.
as
: this is the Lenses SQL keyword that makes specifying the target of the projection explicit.
[<facet>.]<alias>]|[<facet>]
: this nested section, which must be specified whenever as
is used, is composed of two mutually exclusive sub-sections.
[<facet>.]<alias>]
: this optional section is a string that will be used as the field name in the output record. Optionally, the output _facet_
where the field will be projected (e.g. _key
or _value
) can be also specified.
[<facet>]
: this optional section specifies that the result of the projection will make up the whole of the indicated facet of the output record (e.g. the whole key or the whole value).
The above syntax highlights something important about the relationship between projections and facets: a projection can have a source facet and will always have a target facet and while they are related, they might not always be the same.
In the above example query:
field4
is a projection from value facet to value facet
(1 + field1) as _key.a
is a projection from the value facet to the key facet
_key.field2 as result3
is a projection from the key facet to the value facet
5 + 7 as constantField
is a projection to value facet but with no source facet, as the expression does not depend on the input record
This makes SQL projections very expressive, but it also means that attention needs to be paid when facets are manipulated explicitly. Details, edge cases and implications of this will be discussed below.
_value
fieldThis is the default type of projection when a field is selected within the source expression. Unless otherwise specified, the source facet of a projection will always be _value
.
In light of the above, the following query contains only projections that read from the _value
facet of the source:
Also, notice that field1
and _value.field1 as aliased
are reading the exact same input field and in the former case the _value
facet is implicit.
A projection can also access a selected field on the key facet.
The above query contains only projections that read from the _key
facet of the source.
This kind of projection behaves exactly the same as any other projection, but because of specific interactions with other mechanics in Lenses SQL Engine Streaming mode, they can’t be used in the same query with Aggregations or Joins.
All examples of projections described until now focused on selecting a field from either the key or the value of an input record. However, a projection can also read a whole facet and project it to the output.
In the above query, there are two projections:
_key as old-key
: This is projecting the whole key of input-topic
onto a field old-key
on target-topic
_value as old-value
: This is projecting the whole value of input-topic
onto a field old-value
on target-topic
For more details about the rules around using aliases (as done in the above example).
This can be useful when the input source uses a primitive storage format for either one or both facets, but it is desirable to map such facets to named fields within a more complex output structure, as could be the case in the above query. This said projections from whole facets are supported for all storage formats, not only the primitive ones.
As it should be clear from all the examples on this page so far, projections can be freely mixed within a single SELECT
statement; the same query can have many projections, some of which could be reading from the key of the input record, some others from the value and yet others returning literal constants.
Lenses SQL is designed to support this mixed usage and to calculate the appropriate resulting structure given the schemas of all the projections’ inputs.
Lenses SQL assigns a special meaning to *
when used as a projection.
Unqualified Wildcard projection
When *
is used without any further qualification, it is interpreted as an instruction to project all fields from _key_
and _value_
to the output.
The result of this query is that target-topic
will have exactly the same fields, schema and data than input-topic
.
When *
is explicitly qualified, the meaning becomes more precise and it will limit the fields to be selected to only the ones belonging to the qualified source (and optionally facet).
The above shows how a qualified wildcard projection can be used to target all the fields of a specific source. Additionally, a qualified wildcard can be used in addition to other normal projections (e.g. i2.field1
).
The target of a projection is the location within the output record where the result of the projection’s expression is going to be mapped. As previously mentioned, Lenses SQL uses the keyword as
to explicitly control this.
Using as
, it is possible to:
Assign an alias to the projected field in the result. For example, field1 as aliased-field1
is reading field1
and projecting its value to a field called aliased-field1
. Notice that, as no facet information is specified, this will be targeting the value of the output record.
Project directly to nested fields within structures
The above query will result in a field x
that is a structure that contains two fields a
and b
.
Control the facet to which the field is projected. For example, field1 as _key.field1
is reading field1
(from the value facet) and projecting to a field with the same name on the key of the output. Depending on the source being projected, doing this might have important implications.
Project over the whole target value or key For example, field1 as _value
is reading field1
and projecting it over the whole value of the output. One important thing about this is that Lenses SQL allows only one projection of this kind per facet per query. What this means is that the following query would be invalid, because only one projection can target _value
within the same query:
In order to avoid potential errors, Lenses defines the following rules for defining aliases:
Alias can’t add new fields to non-struct properties.
e.g: an_int AS foo.bar, field1 AS foo.bar.field1
(since bar
will be an INT
we can’t define a property field1
under it)
Aliases cannot override previously defined properties.
e.g: field1 AS foo.bar.field1, field2 as foo.bar
(setting foo.bar
with the contents of field2
would override the value of field1
)
Fields cannot have duplicated names.
e.g: field1 AS foo.bar, field2 as foo.bar
(setting foo.bar
with the contents of field2
would override it’s previous content)
Projecting on Key is a feature that can be useful in situations where it is desirable to quickly change the key of a Table or a Stream, maybe in preparation for further operations (e.g. joins etc…). This feature is sometimes referred to as re-keying within the industry.
However, one important implication of using this feature is that Kafka uses the key to determine in what partition a record must be stored; by changing the key of the record, the resulting partitioning of the output topic might differ from one of the input topics. While there is nothing wrong with this, it is something that must be understood clearly when using this feature.
Sometimes it is desirable to limit the input records to be projected based on some predicate.
For example, we might want to project field1
and field2
of input-topic
onto output-topic
, but only if field3
contains a specific value.
This is the WHERE
clause is used: to filter the input dataset by some predicate, applying the rest of the query only to records that match the predicate.
The syntax for this clause is simply WHERE <expression>
, where <expression>
is a valid arbitrarily nested Lenses SQL boolean expression.
Projections have a close relationship with the storage format of their target.
By default, if a query contains more than one projection for the same facet, then that facet’s storage format will be a structure (which type of structure exactly depends on other factors that are not relevant here).
The above query will make target-topic
’s value storage format a structure (e.g. AVRO
or JSON
) with two fields named: result1
and field2
. The storage format for target-topic
’s key will be the same as the input-topic
’s, as there are no projections targeting that facet.
The storage format of the output can be explicitly changed by a projection, however. This will often be the case when a projection on a whole facet is used. Consider the following query:
In this case, target-topic
’s value storage format will still be a structure, but its key will depend on field2
’s schema. For example, if field2
is a string, then target-topic
’s key will be changed to STRING
(assuming it was not STRING
already). The same behavior applies to the _value_
facet.
One example where this can be relevant is when a projection is used to map the result of a single field in the target topic. Consider the following query:
This query will project field1
on the whole value facet, and this will result in a change of storage format as well. This behaviour is quite common when a single projection is used, because more often than not the desired output in such a scenario will be the content of the field rather than a structure with a single field.
This page describes how to aggregate data in Kafka with Lenses SQL Processors.
Aggregations are stateful transformations that allow to grouping of an unbounded set of inputs into sub-sets and then aggregate each of these sub-sets into a single output; the reason why they are stateful is because they need to maintain the current state of computation between the application of each input.
To group a given input dataset into sub-sets, a key function needs to be specified; the result of applying this key function to an input record will be used as a discriminator (sometimes called a pivot) to determine in what sub-set each input record is to be bucketed.
The specific transformation that each aggregation performs is described by the Aggregated Functions used in the input query.
Notice that the behaviour described above is precisely what a Table does. For any given key, there is the state will continuously be updated as new events with the given key are received. In the case of Aggregations, new events are represented by input records in the original dataset that will map to a given key, therefore ending up in a bucket or another.
Whenever Aggregations are used, the result will be a Table. Each entry will have the key set to the grouping discriminator, and the value set to the current state of computation for all input records matching the key.
The complete syntax for aggregations is:
The specific syntactical elements of the above are:
(STREAM | TABLE)
: specifies if the <source>
is to be interpreted as a stream or a table.
<aggregated projection1>
: a projection is aggregated when its source contains an Aggregated Function (e.g. COUNT(*) as x
, CAST(COUNT(*) as STRING) as stringed
).
[, aggregated projection2] ... [, aggregated projectionN]
: a query can contain any number of additional aggregated projections after the first mandatory one.
[, projection1] ... [, projectionM]
: a query can contain any number of common, non-aggregated, projections. Streaming only supports full GROUP BY mode. This means that fields that are not part of the GROUP BY
clause cannot be referenced by non-aggregated projections.
<source>
: a normal source, like a topic or the result of a WITH
statement.
[WINDOW BY <window description>]
: this optional section can only be specified if STREAM
is used. It allows us to describe the windowing that will be applied to the aggregation.
GROUP BY <expression>
: the result of evaluating <expression>
will be used to divide the input values into different groups. These groups will be the input for each aggregated projection specified. The <expression>
’s result will become the key for the table resulting from the query.
Most of the rules and syntax described for Projections apply to aggregated projections as well, but there are some additional syntactical rules due to the specific nature of Aggregations.
Aliasing rules mostly work the same, but it is not possible to project on the key facet; COUNT(*) as _key.a
or SUM(x) as _key
are therefore not allowed.
At least one aggregated projection must be specified in the query.
Projections using an unqualified key facet as the source are not allowed. _key.a
or COUNT(_key.b)
are forbidden because _key
is unqualified, but <source>._key.a
and COUNT(<source>._key.b)
are supported.
As previously mentioned, the GROUP BY
is used to determine the key of the query’s result; the above query will group all records in input-topic
by the value of field1
in each record, and target-topic
’s key will be the schema of field1
.
Just like in the case of the Projections, the Streaming mode takes an opinionated approach here and will simplify the result schema and Storage Format in the case of single field structures.
In the case above, assuming for example that field1
is an integer, target-topic
’s key will not be a structure with a single integer field1
field, but rather just the value field1
; the resulting storage format is going to be INT
, and the label field1
will be just dropped.
In case the above behaviour is not desirable, specifying an explicit alias will allow us to override it.
This will result in target-topic
’s key is a structure with a field keep_me
, with the same schema as field1
. The corresponding Storage Format will match the input format for input-topic
, AVRO
or JSON
.
An example will help clarify how aggregations work, as well as how they behave depending on the semantics of the input dataset they are being applied to.
Assume that we have a Kafka topic (gaming-sessions
) containing these records:
What this data describes is a series of gaming sessions, performed by a player. For each gaming session, the player (used as Key), the points achieved, and the country where the game took place.
Let’s now assume that what we want to calculate is the total points achieved by players in a given country, as well as the average points per game. One way to achieve the desired behaviour is to build a Stream from the input topic. Remember that this means that each event will be considered in isolation.
Explanations for each element of this syntax can be found below, but very briefly, this builds a Stream from gaming-sessions
, grouping all events by country
(e.g. all records with the same country
will be aggregated together) and finally calculating the total (total_points
) and the average (average_points
) of all points for a given group.
The final result in target-topic
will be (disregarding intermediate events):
The results are calculated from the totality of the input results because in a Stream, each event is independent and unrelated to any other.
We now want to calculate something similar to what we obtained before, but we want to keep track only of the last session played by a player, as it might give us a better snapshot of both the performances and locations of players worldwide. The statistics we want to gather are the same as before: total and average of points per country.
The way to achieve the above requirement is simply by reading gaming-sessions into a Table, rather than a Stream, and aggregating it.
The final result in target-topic
will be (disregarding intermediate events):
Compare this with the behaviour from the previous scenario; the key difference is that the value for uk
includes only willy
and noel
, and that’s because the last event moved billy
to the spain
bucket, removing all data regarding him from his original group.
The previous section described the behaviour of aggregations when applied to Tables, and highlighted how aggregations not only need to be able to sum the latest values received to the current state of a group but also need to be able to subtract an obsolete value that might have just been assigned to a new group. As we saw above, it is easy to do this in the case of SUM
and AVG
.
However, consider what would happen if we wanted to add new statistics to the ones calculated above: the maximum points achieved by a player in a given country.
In the Stream scenario, this can be achieved by simply adding MAXK(points,1) as max_points
to the query.
In the Table scenario, however, things are different. We know that the final event moves billy
from uk
to spain
, so we need to subtract from uk
all information related to billy
. In case of SUM
and AVG
that’s possible because subtracting billy
’s points to the current value of the aggregation will return the correct result.
But that’s not possible for MAXK
. MAXK(points, 1)
only keeps track of 1
value, the highest seen so far, and if that’s removed, what value should take its place? The aggregation function cannot inspect the entire topic data to search for the correct answer. The state the aggregation function has access to is that single number, which now is invalid.
This problem explains why some aggregated functions can be used on Streams and Tables (e.g. SUM
), while others can be used only on Streams (e.g. MAXK
).
The key factor is usually whether a hypothetical subtraction operation would need access to all previous inputs to calculate its new value (like MAXK
) or just the aggregated state (like SUM
).
A common scenario that arises in the context of aggregations is the idea of adding a time dimension to the grouping logic expressed in the query. For example, one might want to group all input records by a given field that were received within 1 hour of each other.
To express the above Lenses SQL Streaming supports windowed aggregations, by adding a WINDOW BY
clause to the query. Given their semantics, tables cannot be aggregated using a window, because it would not make sense. A table represents the latest_ state of a set of (Key, Value) pairs, not a series of events interspersed over a time continuum. Thus trying to window them is not a sensible operation.
Filtering the input into aggregated queries is similar to filtering non-aggregated ones. When using a WHERE <expression>
statement, where <expression>
is a valid SQL boolean expression, all records that do not match the predicate will be left out.
However, aggregated functions add a further dimension to what it might be desirable to filter.
We might be interested in filtering based on some conditions of the groups themselves; for example, we might want to count all input records that have a given value offield1
, but only if the total is greater than 3. In this case, WHERE
would not help, because it has no access to the groups nor to the results of the aggregated projections. The below query is what is needed.
The above query uses the HAVING
clause to express a filter at a grouping level. Using this feature it is possible to express a predicate on the result of aggregated projections and filter out the output records that do not satisfy it.
Only aggregated projections specified in the SELECT
clause can be used within the HAVING
clause.
This page describes how to use lateral joins for data in Kafka with Lenses SQL Processors.
With Lateral Joins you can combine a data source with any array expression. As a result, you will get a new data source, where every record of the original one will be joined with the values of the lateral array expression.
Assume you have a source
where elements
is an array field:
field1 | field2 | elements |
---|
Then a Lateral Join of source
with elements
is a new table, where every record of source
will be joined with all the single items of the value of elements
for that record:
field1 | field2 | elements | element |
---|
In this way, the single elements of the array become available and can be used as a normal field in the query.
A query using lateral joins looks like a regular query apart from the definition of its source:
projection: as in a single-table select, all the fields from <source>
will be available in the projection. In addition to that, the special field <lateralAlias>
will be available.
source: the source of data. Note: it is not possible to specify a normal join as a source of a lateral join. This limitation will be removed in the future.
lateralArrayExpression: any expression that evaluates to an array. Fields <source>
are available for defining this expression.
filterExpression: a filter expression specifying which records should be filtered.
Assume you have a topic batched_readings populated with the following records:
batched_readings
As you can see, readings
is a field containing arrays of integers.
We define a processor like this:
The processor will emil the following records:
Things to notice:
We used the aliased lateral expression reading
both in the projection and in the WHERE
.
The _key
for each emitted record is the one of the original record. As usual you can change this behavior projecting on the key with a projection like expression AS _key
.
batched_readings
records with keys a
and b
have been split into multiple records. That’s because they contain multiple readings greater than 90
.
Record d
disappeared, because it has no readings greater than 90
It is possible to use multiple LATERAL
joins in the same FROM
clause.
Assume you have a topic batched_nested_readings populated with the following records:
batched_readings
Notice how nested_readings
contains arrays of arrays of integers.
To get the same results of the previous example, we use a first lateral join to unpack the first level of nested_readings
into an array that we call readings
. We then define a second lateral join on readings
to extract the single values:
In the previous example we used a simple field as the <lateralArrayExpression>
. In the section we will see how any array expression can be used for it.
Assume you have a topic day_night_readings populated with the following records:
day_night_readings
We can make use of Array Functions to lateral join day_night_readings
on the concatenation of the two readings fields:
This page describes handling nulls values in Kafka data with Lenses SQL Processors.
Null values are used as a way to express a value that isn’t yet known.
Null values can be found in the data present in existing sources, or they can be the product of joining data using non-inner joins.
The schema of nullable types is represented as a union of the field type and a null value:
Working with null values can create situations where it’s not clear what the outcome of an operation is.
One example of this would be the following:
Looking at the first two expressions, one may be tempted to solve the problem above by saying “Null is 1 when multiplying and 0 when summing” meaning the following would be the evaluation result:
Rewriting the third expression applying the distributive property of multiplication however, shows that the rule creates inconsistencies:
With the intent of avoiding scenarios like the above where a computation may have different results based on the evaluation approach taken, most operations in lenses do not allow operations to use nullable types.
Lenses provides the following tools to address nullability in a flexible way:
Coalesce: A function that allows specifying a list of fields to be tried until the first non-null value is found.
Note: the coalesce function won’t verify if a non-nullable field is provided so an error may still be thrown if all the provided fields are null
e.g:COALESCE(nullable_fieldA, nullable_fieldB, 0)
AS_NON_NULLABLE: a function that changes the type of a property from nullable to non-nullable.
Note: This function is unsafe and will throw an error if a null value is passed. It should only be used if there’s a guarantee that the value won’t ever be null (for instance if used in a CASE branch where the null case has been previously handled or if the data has previously been filtered and the null values removed).
e.g: AS_NON_NULLABLE(nullable_field)
AS_NON_NULLABLE with CASE: A type-checked construct equivalent to using coalesce:
e.g:
The AS_NULLABLE
function is the inverse transformation of the AS_NON_NULLABLE
version. This function allows a non-nullable field type to be transformed into a nullable type. It can be used to insert data into existing topics where the schema of the target field is nullable.
This page describes using settings in Lenses SQL Processors to process data in Kafka.
The SET syntax allows customizing the behaviour for the underlying Kafka Consumer/Producer, Kafka Streams (including RocksDB parameters), topic creation and error handling.
The general syntax is:
SQL processors can create topics that are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic-related) to allow distinct setups for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.
To set the defaults follow this syntax:
Key | Type | Description |
---|
All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:
The streaming engine allows users to define how errors are handled when writing to or reading from a topic.
Both sides can be set at once by doing:
or individually as described in the sections below.
Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:
While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:
There are three possible values to control the behaviour.
When dlq
is used this setting is required. The value is the target topic where the problematic records will be sent to.
Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.
\
Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.
Some of the configurations for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.
Stateful data flow applications might require, on rare occasions, some of the parameters for the underlying RocksDB to be tweaked.
To set the properties, use:
This page describes the storage formats of data in Kaka supported by Lenses SQL Processors.
The output storage format depends on the sources. For example, if the incoming data is stored as JSON, then the output will be JSON as well. The same applies when Avro is involved.
When using a custom storage format, the output will be JSON.
At times, it is required to control the resulting Key and/or Value storage. If the input is JSON, for example, the output for the streaming computation can be set to Avro.
Another scenario involves Avro source(-s), and a result which projects the Key as a primitive type. Rather than using the Avro storage format to store the primitive, it might be required to use the actual primitive format.
Controlling the storage format can be done using the following syntax:
There is no requirement to always set both the Key and the Value. Maybe only the Key or maybe only the Value needs to be changed. For example:
Considering a scenario where the input data is stored as Avro, and there is an aggregation on a field which yields an INT, using the primitive INT storage and not the Avro INT storage set the Key format to INT:
Here is an example of the scenario of having Json input source(-s), but an Avro stored output:
Changing the storage format is guarded by a set of rules. The following table describes how storage formats can be converted for the output.
Time windowed formats follow similar rules to the ones described above with the additional constraint that Session Windows(SW) cannot be converted into Time Windows (TW) nor vice-versa.
Example: Changing the storage format from TWAvro to TWJson is possible since they’re both TW formats and Avro can be converted to JSON.
Example: Changing the storage format from TWString to TWJson is not possible since, even though they’re both TW formats, String formats can’t be written as JSON.
XML as well as any custom formats are only supported as an input format. Lenses will, by default translate and process these formats by translating them to JSON and writing them as such (AVRO is also supported if a store is explicitly set).
This page describes the AVG function in Lenses SQL.
Returns the average of the values in a group. It ignores the null value. It can be used with numeric input only.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|
sample code:
Output:
meter_id | reading |
---|---|
_key | id | name |
---|---|---|
_key | customer_id | item |
---|---|---|
name | item |
---|---|
name | item |
---|---|
name | item |
---|---|
name | item |
---|---|
arrival | id | name |
---|---|---|
arrival | item | customer_id |
---|---|---|
name | item |
---|---|
arrival | id | name |
---|---|---|
arrival | item | customer_id |
---|---|---|
name | item |
---|---|
arrival | id | name |
---|---|---|
name | item |
---|---|
Left | Right | Allowed types | Window | Result |
---|---|---|---|---|
emmited | processed | id | name |
---|---|---|---|
arrival | processed | item | customer_id |
---|---|---|---|
Duration | Description | Example |
---|
Offset | Key | Value |
---|
Offset | Key | Value |
---|
Key | Value |
---|
Offset | Key | Value |
---|
_key | meter_id | readings |
---|
_key | meter_id | reading |
---|
_key | meter_id | nested_readings |
---|
_key | meter_id | readings_day | readings_night |
---|
_key | meter_id | reading |
---|
Value | Description |
---|
Key | Type | Description |
---|
Key | Type | Description |
---|
From \ To | INT | LONG | STRING | JSON | AVRO | XML | Custom/Protobuf |
---|
From \ To | SW[B] | TW[B] |
---|
1
100
1
95
1
91
2
93
1
92
1
94
1
1
John
2
2
Frank
1
1
Computer
2
1
Mouse
3
3
Keyboard
John
Computer
John
Mouse
John
Computer
John
Mouse
null
Keyboard
John
Computer
John
Mouse
null
Keyboard
John
Computer
John
Mouse
null
Keyboard
Frank
null
t = 6s
1
Frank
t = 20s
2
John
t = 10s
Computer
1
t = 11s
Keyboard
2
Frank
Computer
null
Keyboard
t = 0s
1
Frank
t = 10s
1
John
t = 0s
Computer
1
t = 1s
Keyboard
2
Frank
Computer
null
Keyboard
t = 10s
2
John
Frank
Computer
null
Keyboard
John
Keyboard
Stream
Stream
All
Required
Stream
Table
Table
All
No
Table
Table
Stream
RIGHT JOIN
No
Stream
Stream
Table
INNER, LEFT JOIN
No
Stream
t = 0s
t = 20s
1
Frank
t = 0s
t = 10s
Computer
1
_meta.offset
The offset of the record in its Kafka topic partition
_meta.partition
The Kafka topic partition of the record
_meta.timestamp
The Kafka record timestamp
_meta.__keysize
The length in bytes of the raw key stored in Kafka
_meta.__valuesize
The length in bytes of the raw value stored in Kafka
Exploring
Learn how to use Lenses to explore topics, their data, configurations and consumers.
SQL Studio
Learn how to use the SQL Studio to interactively query data in your Kafka topics and more.
Processing
Learn how to process data, join, filter aggregate and more with Lenses SQL Processors.
0 | billy | {points: 50, country: “uk”} |
1 | billy | {points: 90, country: “uk”} |
2 | willy | {points: 70, country: “uk”} |
3 | noel | {points: 82, country: “uk”} |
4 | john | {points: 50, country: “usa”} |
5 | dave | {points: 30, country: “usa”} |
6 | billy | {points: 90, country: “spain”} |
3 | uk | {total_points: 292, average_points: 73} |
5 | usa | {total_points: 80, average_points: 40} |
6 | spain | {total_points: 90, average_points: 90} |
uk | {total_points: 152, average_points: 76} |
usa | {total_points: 80, average_points: 40} |
spain | {total_points: 90, average_points: 90} |
3 | uk | {total_points: 292, average_points: 73, max_points: 90} |
5 | usa | {total_points: 80, average_points: 40, max_points: 50} |
6 | spain | {total_points: 90, average_points: 90, max_points: 90} |
a | 1 | [100, 80, 95, 91] |
b | 2 | [87, 93, 100] |
c | 1 | [88, 89, 92, 94] |
d | 2 | [81] |
a | 1 | 100 |
a | 1 | 95 |
a | 1 | 91 |
b | 2 | 93 |
c | 1 | 92 |
c | 1 | 94 |
a | 1 | [[100, 80], [95, 91]] |
b | 2 | [[87], [93, 100]] |
c | 1 | [[88, 89], [92, 94]] |
d | 2 | [[81]] |
a | 1 | [100, 80] | [95, 91] |
b | 2 | [87, 93] | [100] |
c | 1 | [88] | [89, 92, 94] |
d | 2 | [81] | [] |
a | 1 | 100 |
a | 1 | 95 |
a | 1 | 91 |
b | 2 | 93 |
c | 1 | 92 |
c | 1 | 94 |
ms | time in milliseconds. |
|
s | time in seconds. |
|
m | time in minutes. |
|
h | time in hours. |
|
continue | Allows the application to carry on. The problem will be logged. |
fail | Stops the application. The application will be in a failed (error) state. |
dlq | Allows the application to continue but it will send the payload to a dead-letter-topic. It requires |
processing.guarantee | STRING | The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production. |
commit.interval.ms | LONG | The frequency with which to save the position of the processor. If |
poll.ms | LONG | The amount of time in milliseconds to block waiting for input. |
cache.max.bytes.buffering | LONG | Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024. |
client.id | STRING | An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer |
num.standby.replicas | INT | The number of standby replicas for each task. Default value is 0. |
num.stream.threads | INT | The number of threads to execute stream processing. Default values is 1. |
max.task.idle.ms | LONG | Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams. |
buffered.records.per.partition | INT | Maximum number of records to buffer per partition. Default is 1000. |
buffered.records.per.partition | INT | Maximum number of records to buffer per partition. Default is 1000. |
connections.max.idle.ms | LONG | Close idle connections after the number of milliseconds specified by this config. |
receive.buffer.bytes | LONG | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. |
reconnect.backoff.ms | LONG | The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. |
reconnect.backoff.max.ms | LONG | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000. |
retries | INT | Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0 |
retry.backoff.ms | LONG | The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100. |
send.buffer.bytes | LONG | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024. |
state.cleanup.delay.ms | LONG | The amount of time in milliseconds to wait before deleting state when a partition has migrated. |
rocksdb.table.block.cache.size | LONG | Set the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M |
rocksdb.table.block.size | LONG | Approximate size of user data packed per lock. Default: 4K |
rocksdb.table.block.cache.compressed.num.shard.bits | INT | Controls the number of shards for the block compressed cache |
rocksdb.table.block.cache.num.shard.bits | INT | Controls the number of shards for the block cache |
rocksdb.table.block.cache.compressed.size | LONG | Size of compressed block cache. If 0,then block_cache_compressed is set to null |
rocksdb.table.block.restart.interval | INT | Set block restart interval |
rocksdb.table.block.cache.size.and.filter | BOOL | Indicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization |
rocksdb.table.block.checksum.type | STRING | Sets the checksum type to be used with this table. Available values: |
rocksdb.table.block.hash.allow.collision | BOOL | Influence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption) |
rocksdb.table.block.index.type | STRING | Sets the index type to used with this table. Available values: |
rocksdb.table.block.no.cache | BOOL | Disable block cache. If this is set to true, then no block cache should be used. Default: false |
rocksdb.table.block.whole.key.filtering | BOOL | If true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true |
rocksdb.table.block.pinl0.filter | BOOL | Indicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false. |
rocksdb.total.threads | INT | The max threads RocksDB should use |
rocksdb.write.buffer.size | LONG | Sets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file |
rocksdb.table.block.size.deviation | INT | This is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10. |
rocksdb.compaction.style | STRING | Available values: |
rocksdb.max.write.buffer | INT |
rocksdb.base.background.compaction | INT |
rocksdb.background.compaction.max | INT |
rocksdb.subcompaction.max | INT |
rocksdb.background.flushes.max | INT |
rocksdb.log.file.max | LONG |
rocksdb.log.fle.roll.time | LONG |
rocksdb.compaction.auto | BOOL |
rocksdb.compaction.level.max | INT |
rocksdb.files.opened.max | INT |
rocksdb.wal.ttl | LONG |
rocksdb.wal.size.limit | LONG |
rocksdb.memtable.concurrent.write | BOOL |
rocksdb.os.buffer | BOOL |
rocksdb.data.sync | BOOL |
rocksdb.fsync | BOOL |
rocksdb.log.dir | STRING |
rocksdb.wal.dir | STRING |
INT | = | yes | yes | no | yes | no | no |
LONG | no | = | yes | no | yes | no | no |
STRING | no | no | = | no | yes | no | no |
JSON | If the Json storage contains integer only | If the Json storage contains integer or long only | yes | = | yes | no | no |
AVRO | If Avro storage contains integer only | If the Avro storage contains integer or long only | yes | yes | = | no | no |
XML | no | no | no | yes | yes | no | no |
Custom (includes Protobuf) | no | no | no | yes | yes | no | no |
SW[A] | yes if format A is compatible with format B | no |
TW[A] | no | yes if format A is compatible with format B |
a | 1 | [1, 2] |
b | 2 | [3, 4, 5] |
c | 3 | [6] |
a | 1 | [1, 2] | 1 |
a | 1 | [1, 2] | 2 |
b | 2 | [3, 4, 5] | 3 |
b | 2 | [3, 4, 5] | 4 |
b | 2 | [3, 4, 5] | 5 |
c | 3 | [6] | 6 |
autocreate | BOOLEAN | Creates the topic if it does not exist already. |
partitions | INT | Controls the target topic partitions count. If the topic already exists, this will not be applied. |
replication | INT | Controls the topic replication factor. If the topic already exists, this will not be applied. |
- | Each Kafka topics allows a set of parameters to be set. For example |
key.avro.record | STRING | Controls the output record Key schema name. |
key.avro.namespace | STRING | Controls the output record Key schema namespace. |
value.avro.record | STRING | Controls the output record Key schema name. |
value.avro.namespace | STRING | Controls the output record Key schema namespace. |
✓ | ✓ | ✓ |
This page describes the expressions in Lenses SQL Processors.
Expressions are the parts of a Lenses SQL query that will be evaluated to single values.
Below is the complete list of expressions that Lenses SQL supports.
A literal is an expression that represents a concrete value of a given type. This means that there is no resolution needed for evaluating a literal and its value is simply what is specified in the query.
Integer numbers can be introduced in a Lenses SQL query using integer literals:
In the above query 1
, 2
are integer literals.
Decimal number literals can be used to express constant floating-point numbers:
To express strings, string literals can be used. Single quotes ('
) and double quotes ("
) are both supported as delimiters:
In the example above, "hello "
and 'world!'
are string literals.
Boolean constant values can be expressed using the false
and true
boolean literals:
Sometimes it is necessary to the NULL
literal in a query, for example to test that something is or is not null, or to put a NULL
the value facet, useful to delete records in a compacted topic:
An array is a collection of elements of the same type.
A new array can be defined with the familiar [...]
syntax:
You can use more complex expressions inside the array:
and nested arrays as well:
Note: empty array literals like []
are currently not supported by Lenses SQL. That will change in future versions.
An element of an array can be extracted appending, to the array expression, a pair of square brackets containing the index of the element.
Some examples:
Note how the expression on the left of the brackets can be of arbitrary complexity, like in complexExpression[0].inner[1]
or [1, 2, 3][myIndex]
.
A Struct is a value that is composed by fields and sub-values assigned to those fields. It is similar to what an object is in JSON.
In Lenses SQL there are two ways of building new structs.
In a SELECT
projection, it is possible to use nested aliases to denote the fields of a struct.
In the next example, we are building a struct field called user
, with two subfields, one that is a string, and another one that is a struct:
When the projection will be evaluated, a new struct user
will be built.
The result will be a struct with a name
field, and a nested struct assigned to the contact
field, containing type
and value
subfields.
While nested aliases are a quick way to define new structs, they have some limitations: they can only be used in the projection section of a SELECT
, and they do not cover all the cases where a struct can potentially be used.
Struct expressions overcome these limitations.
With struct expressions one can explicitly build complex structs, specifying the name and the values of the fields, one by one, and as any other expression, they can be used inside other expressions and in any other part of the query where an expression is allowed.
The syntax is similar to the one used to define JSON objects:
Note how the first projection
is equivalent to the three projections used in the previous paragraph:
while the second projection userWithContacts
is not representable with nested aliases, because it defines structs inside an array.
A selection is an explicit reference to a field within a struct. The syntax for a selection is:
Selections can be used to directly access a field of a facet, optionally specifying the topic and the facet:
It is also possible to select a field from more complex expressions. Here we use selections to select fields from array elements, or to directly access a nested field of a struct expression:
In general, a field selection can be used on any expression that returns a struct.
If there are special characters in the field names, backticks (`
) can be used:
A binary expression is an expression that is composed of the left-hand side and right-hand side sub-expressions and an operator that describes how the results of the sub-expressions are to be combined into a single result.
Currently, supported operators are:
Logical operators: AND
, OR
Arithmetic operators: +
, -
, *
, /
, %
(mod)
Ordering operators: >
, >=
, <
, <=
Equality operators: =
, !=
String operators: LIKE
, NOT LIKE
Inclusion operators: IN
, NOT IN
A binary expression is the main way to compose expressions into more complex ones.
For example, 1 + field1
and LENGTH(field2) > 5
are binary expressions, using the +
and the >=
operator respectively.
CASE
expressions return conditional values, depending on the evaluation of sub-expressions present in each of the CASE
’s branches. This expression is Lenses SQL version of what other languages call a switch-statement or if-elseif-else construct.
A function is a predefined named operation that takes a number of input arguments and is evaluated into a result. Functions usually accept the result of other expressions as input arguments, so functions can be nested.
This page describes how to use date and time functions in Lenses SQL Processors.
Every Date Math expression starts with a base date or time followed by the addition or subtraction of one or more durations.
The base date or time (from here onward) is derived from a field in a table or a function such as now()
or yesterday()
that generates datetime values.
The shorthand syntax is a unit value followed by a unit symbol. The symbols are:
y (year)
M (month)
w (week)
d (day)
h (hour)
m (minute)
s (second)
This page describes the BOTTOMK function in Lenses SQL.
Returns the last K lowest ranked values. The ranking is based on how many times a value has been seen.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the FIRST function in Lenses SQL.
Returns the first item seen in a group.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the COUNT function in Lenses SQL.
Returns the number of records returned by a query or the records in a group as a result of a GROUP BY
statement.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describe the COLLECT function in Lenses SQL.
Returns an array in which each value in the input set is assigned to an element of the array.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the COLLECT_UNIQUE function in Lenses SQL.
Returns an array of unique values in which each value in the input set is assigned to an element of the array.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the LAST function in Lenses SQL.
Returns the last item seen in a group.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the MINK function in Lenses SQL.
Returns the N smallest values of an numExpr.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the MINK_UNIQUE function in Lenses SQL.
Returns the N smallest unique values of a numExpr.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the MAXK function in Lenses SQL.
Returns the N largest values of a numExpr.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the SUM function in Lenses SQL.
Returns the sum of all the values, in the expression. It can be used with numeric input only. Null values are ignored.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the MAXK_UNIQUE function in Lenses SQL.
Returns the N smallest unique values of a numExpr.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|---|---|
Sample code:
Output:
This page describes the TOPK function in Lenses SQL.
Returns the K highest ranked values. The ranking is based on how many times a value has been seen.
Available in:
Processor (stateless) | Processors (stateful) | SQL Studio |
---|
Sample code:
Output:
This page describes the REPEAT function in Lenses SQL.
Build an array repeating element
n
times.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the ELEMENT_OF function in Lenses SQL.
Return the element of array
at index.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the ZIP function in Lenses SQL.
Zip two or more arrays into a single one.
Example: ZIP([1, 2], 'x', [3, 4, 5], 'y')
will be evaluated to [{ x: 1, y: 3 }, { x: 2, y: 4 }]
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the FLATTEN function in Lenses SQL.
Flatten an array of arrays into an array.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the SIZEOF function in Lenses SQL.
Returns the number of elements in an array.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the CONVERT_DATETIME function in Lenses SQL.
Converts the string format of a date [and time] to another using the pattern provided.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the IN_ARRAY function in Lenses SQL.
Check if element
is an element of array.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the ZIP_ALL function in Lenses SQL.
Zip two or more arrays into a single one, returning null
s when an array is not long enough.
Example: ZIP_ALL([1, 2], 'x', [3, 4, 5], 'y')
will be evaluated to [{ x: 1, y: 3 }, { x: 2, y: 4 }, { x: null, y: 5 }]
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the EXTRACT_DATE function in Lenses SQL.
Extracts the date portion of a timestamp-micros or timestamp-millis returning a date value.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the EXTRACT_TIME function in Lenses SQL.
Extracts the time portion of a timestamp-micros or timestamp-millis returning a time-millis or time-micros value depending on the timestamp precision.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the DATETIME function in Lenses SQL.
Provides the current ISO date and time.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the FORMAT_TIMESTAMP function in Lenses SQL.
Returns a string representation of a timestamp value according to a given pattern.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
Expressions
Functions
Custom Functions
Deserializers
Supported data formats
✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
This page describes the MONTH_TEXT function in Lenses SQL.
Returns the month name.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the MINUTE function in Lenses SQL.
Extracts the minute component of an expression that is of type timestamp.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the MONTH function in Lenses SQL.
Builds a timestamp-millis value from a long or int value.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the FORMAT_TIME function in Lenses SQL.
Returns a string representation of a time value according to a given pattern.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the DATE function in Lenses SQL.
Builds a local date value from a long or int value. This function can also be used with no parameters to return the current ISO date.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the FORMAT_DATE function in Lenses SQL.
Returns a string representation of a date value according to a given pattern.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the PARSE_DATE function in Lenses SQL.
Builds a date value given a date string representation and a date pattern.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the PARSE_TIMESTAMP_MILLIS function in Lenses SQL.
Builds a timestamp-millis value given a datetime string representation and a date time pattern.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the PARSE_TIMESTAMP_MICROS function in Lenses SQL.
Builds a timestamp-micros value given a datetime string representation and a date time pattern.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the HOUR function in Lenses SQL.
Extracts the hour component of an expression that is of type timestamp.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the PARSE_TIME_MILLIS function in Lenses SQL.
Builds a time-millis value given a time string representation and a time pattern.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the SECOND function in Lenses SQL.
Extracts the second component of an expression that is of type timestamp.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the PARSE_TIME_MICROS function in Lenses SQL.
Builds a time-micros value given a time string representation and a time pattern.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TO_DATE function in Lenses SQL.
Converts a string representation of a date into epoch value using the pattern provided.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TIMESTAMP_MICROS function in Lenses SQL.
Builds a timestamp-micros value from a long or int value.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TO_DATETIME function in Lenses SQL.
Converts a string representation of a datetime into epoch value using the pattern provided.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TOMORROW function in Lenses SQL.
Returns the current date time plus 1 day.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TIME_MILLIS function in Lenses SQL.
Builds a time-millis value from a long or int value.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TO_TIMESTAMP function in Lenses SQL.
Converts a string representation of a date into epoch value using the pattern provided.
Converts a string using a pattern to a date and time type.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TIMESTAMP_MILLIS function in Lenses SQL.
Builds a timestamp-millis value from a long or int value.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TIME_MICROS function in Lenses SQL.
Builds a time-micros value from a long or int value.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the TIMESTAMP function in Lenses SQL.
Returns a timestamp for a given date and time at a specific zone id.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the HEADERASSTRING function in Lenses SQL.
Returns the value of the record header key as a STRING value.
Available in:
Processors | SQL Studio |
---|---|
This page describes the YEAR function in Lenses SQL.
Extracts the year component of an expression that is of type timestamp.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the HEADERASINT function in Lenses SQL.
Returns the value of the record header key as an INT value.
Available in:
Processors | SQL Studio |
---|---|
This page describes the HEADERASLONG function in Lenses SQL.
Returns the value of the record header key as a LONG value.
Available in:
Processors | SQL Studio |
---|---|
This page describes the HEADERASDOUBLE function in Lenses SQL.
Returns the value of the record header key as a DOUBLE value.
Available in:
Processors | SQL Studio |
---|---|
This page describes the YESTERDAY function in Lenses SQL.
Returns the current date time minus 1 day.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the HEADERASFLOAT function in Lenses SQL.
Returns the value of the record header key as a FLOAT value.
Available in:
Processors | SQL Studio |
---|
This page describes the ABS function in Lenses SQL.
Returns the absolute value of an expression
that evaluates to a number type.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the HEADERKEYS function in Lenses SQL.
Returns all the header keys for the current record.
Available in:
Processors | SQL Studio |
---|
This page describes the JSON_EXTRACT_ALL function in Lenses SQL.
Interprets ‘pattern’ as a Json path pattern and applies it to ‘json_string’, returning all matches, as an array of strings containing valid json. Examples for the pattern parameter: “$.a”, “$[‘a’]”, “$[0]”, “$.points[?(@[‘id’]==‘i4’)].x”, “$[‘points’][?(@[‘y’] >= 3)].id”, “$.conditions[?(@ == false)]”
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the JSON_EXTRACT_FIRST function in Lenses SQL.
Interprets ‘pattern’ as a Json path pattern and applies it to ‘json_string’, returning the first match, as a string containing valid json. Examples for the pattern parameter: “$.a”, “$[‘a’]”, “$[0]”, “$.points[?(@[‘id’]==‘i4’)].x”, “$[‘points’][?(@[‘y’] >= 3)].id”, “$.conditions[?(@ == false)]”
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the ASIN function in Lenses SQL.
Returns the trigonometric arc sine of an expression.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the ACOS function in Lenses SQL.
Returns the trigonometric arc cosine of an expression.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the CBRT function in Lenses SQL.
Returns the cube root of numExpr.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes the CEIL function in Lenses SQL.
Returns the absolute value of an expression.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
\
This page describes the ATAN function in Lenses SQL.
Returns the trigonometric arc tangent of an expression.
Available in:
Processors | SQL Studio |
---|
Sample code:
Output:
This page describes how to use numeric functions in Lenses SQL Processors.
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
✓
| The remainder operator (%) computes the remainder after dividing its first operand by its second i.e. | ✓ | ✓ |
| Divides one number by another (an arithmetic operator) i.e. | ✓ | ✓ |
| Subtracts one number from another (an arithmetic operator) i.e. | ✓ | ✓ |
| Multiplies one number with another (an arithmetic operator) i.e. | ✓ | ✓ |
| Adds one number to another (an arithmetic operator) i.e. | ✓ | ✓ |
| Returns the negative of the value of a numeric expression (a unary operator) i.e. | ✓ | ✓ |
✓ |
✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
✓ | ✓ |
This page describes the COSH function in Lenses SQL.
Returns the hyperbolic cosine of an expression.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
This page describes the COS function in Lenses SQL.
Returns the trigonometric cosine of an expression.
Available in:
Processors | SQL Studio |
---|---|
Sample code:
Output:
✓
✓
✓
✓