Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
A declarative SQL interface, for querying, transforming and manipulating data at rest and data in motion. It works with Apache Kafka topics and other data sources. It helps developers and Kafka users
The Lenses SQL Snapshot engine accesses the data at the point in time the query is executed. This means, that for Apache Kafka, data added just after the query was initiated will not be processed.
Typical use cases are but are not limited to:
Identifying a specific message.
Identifying a particular transaction of payment that your system has processed
Identifying all thermostats readings for a specific customer if you are working for an energy provider
Counting transactions processed within a given time window.
The Snapshot engine presents a familiar SQL interface, but remember that it queries Kafka with no indexes. Use Kafka's metadata (partition, offset, timestamp) to improve query performance.
Go to Workspace->Sql Studio, enter your query, and click run.
This page describes common filtering of data in Kafka with Lenses SQL Studio.
WHERE
clause allows you to define a set of logical predicates the data needs to match in order to be returned. Standard comparison operators are supported (>
, >=
, <
, <=
, =
, and !=
) as well as calling functions.
We are going to use the groceries
table created earlier. Select all items purchased where the prices are greater or equal to 2.00:
Select all customers whose last name length equals to 5:
Search all customers containing Ana
in their first name:
Keep in mind that text search is case-sensitive. To use case insensitive text search, you can write:
Sometimes data can contain explicit NULL values, or it can omit fields entirely. Using IS [ NOT ] NULL, or EXISTS functions allows you to check for these situations.
Exists
is a keyword in Lenses SQL grammar so it needs to be escaped, the escape character is `````.
Lenses supports JSON. JSON does not enforce a schema allowing you to insert null values.
Create the following table named customers_json
:
Query this table for all its entries:
The middle_name
is only present on the mikejones
record.
Write a query which filters out records where middle_name
is not present:
This can also be written as:
When a field is actually NULL
or is missing, checking like in the above query has the same outcome.
You can use AND/OR to specify complex conditions for filtering your data.
To filter the purchased items where more than one item has been bought for a given product, and the unit price is greater than 2:
Now try changing the AND logical operand to OR and see the differences in output.
To filter the entries returned from a grouping query. As with the WHERE
statement, you can use HAVING
syntax to achieve the same result when it comes to grouped queries.
To select data from a specific partition access the metadata of the topic.
In the following example, a table is created with three partitions and the message key is hashed and then the remainder HashValue % partitions
will be the table partition the record is sent to.
Next, run the following query:
As you can see from the results (your timestamps will be different) the records span over the three partitions. Now query specific partitions:
Kafka reads are non-deterministic over multiple partitions. The Snapshot engine may reach its max.size
before it finds your record in one run, next time it might.
If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.
If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.
The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.Time-traveling
The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.
This page describes the best practices when using Lenses SQL Studio to query data in Kafka.
Does Apache Kafka have indexing?
No. Apache Kafka does not have the full indexing capabilities in the payload (indexes typically come at a high cost even on an RDBMS / DB or a system like Elastic Search), however, Kafka indexes the metadata.
The only filters Kafka supports are topic, partition and offsets or timestamps.
When querying Kafka topic data with SQL such as
a full scan will be executed, and the query processes the entire data on that topic to identify all records that match the transaction id.
If the Kafka topic contains a billion 50KB messages - that would require querying 50 GB of data. Depending on your network capabilities, brokers’ performance, any quotas on your account, and other parameters, fetching 50 GB of data could take some time! Even more, if the data is compressed. In the last case, the client has to decompress it before parsing the raw bytes to translate into a structure to which the query can be applied.
When Lenses can’t read (deserialize) your topic’s messages, it classifies them as “bad records”. This happens for one of the following reasons:
Kafka records are corrupted. On an AVRO topic, a rogue producer might have published a different format
Lenses topic settings do not match the payload data. Maybe a topic was incorrectly given AVRO format when it’s JSON or vice versa
If AVRO payload is involved, maybe the Schema Registry is down or not accessible from the machine running Lenses
By default, Lenses skips them and displays the records’ metadata in the Bad Records tab. If you want to force stop the query in such case use:
Querying a table can take a long time if it contains a lot of records. The underlying Kafka topic has to be read, the filter conditions applied, and the projections made.
Additionally, the SELECT
statement could end up bringing a large amount of data to the client. To be able to constrain the resources involved, Lenses allows for context customization, which ends up driving the execution, thus giving control to the user. Here is the list of context parameters to overwrite:
All the above values can be given a default value via the configuration file. Using lenses.sql.settings
as prefix the format.timestamp
can be set like this:
Lenses SQL uses Kafka Consumer to read the data. This means that an advanced user with knowledge of Kafka could tweak the consumer properties to achieve better throughput. This would occur on very rare occasions. The query context can receive Kafka consumer settings. For example, the max.poll.records
consumer can be set as:
The fact is that streaming SQL is operating on unbounded streams of events: a query would normally be a never-ending query. In order to bring query termination semantics into Apache Kafka we introduced 4 controls:
LIMIT = 10000 - Force the query to terminate when 10,000 records are matched.
max.bytes = 20000000 - Force the query to terminate once 20 MBytes have been retrieved.
max.time = 60000 - Force the query to terminate after 60 seconds.
max.zero.polls = 8 - Force the query to terminate after 8 consecutive polls are empty, indicating we have exhausted a topic.
Thus, when retrieving data, you can set a limit of 1GB to the maximum number of bytes retrieved and a maximum query time of one hour like this:
This page describes the concepts of the Lenses SQL snapshot engine that drives the SQL Studio allowing you to query data in Kafka.
Escape topic names with backticks if they contain non-alpha numeric characters
Snapshot queries on streaming data provide answers to a direct question, e.g. The current balance is $10. The query is active, the data is passive.
A single entry in a Kafka topic is called a message.
The engine considers a message to have four distinct components key
, value
, headers
and metadata
.
Currently, the Snapshot Engine supports four different facets _key
, _value
, _headers
and _metadata
; These strings can be used to reference properties of each of the aforementioned message components and build a query that way.
By default, unqualified properties are assumed to belong to the _value
facet:
In order to reference a different facet, a facet qualifier can be added:
When more than one sources/topics are specified in a query (like it happens when two topics are joined) a table reference can be added to the selection to fix the ambiguity:
the same can be done for any of the other facets (_key
,_meta
,_headers
).
Note Using a wildcard selection statement SELECT * provides only the value component of a message.
Headers are interpreted as a simple mapping of strings to strings. This means that if a header is a JSON, XML or any other structured type, the snapshot engine will still read it as a string value.
Messages can contain nested elements and embedded arrays. The .
operator is used to refer to children, and the []
operator is used for referring to an element in an array.
You can use a combination of these two operators to access data of any depth.
You explicitly reference the key, value and metadata.
For the key use _key
, for the value use _value
, and for metadata use _meta
. When there is no prefix, the engine will resolve the field(s) as being part of the message value. For example, the following two queries are identical:
When the key or a value content is a primitive data type use the prefix only to address them.
For example, if messages contain a device identifier as the key and the temperature as the value, SQL code would be:
Use the _meta
keyword to address the metadata. For example:
When projecting a field into a target record, Lenses allows complex structures to be built. This can be done by using a nested alias like below:
The result would be a struct with the following shape:
When two alias names clash, the snapshot engine does not “override” that field. Lenses will instead generate a new name by appending a unique integer. This means that a query like the following:
will generate a structure like the following:
The tabled query allows you to nest queries. Let us take the query in the previous section and say we are only interested in those entries where there exist more than 1 customer per country.
Run the query, and you will only see those entries for which there is more than one person registered per country.
Functions can be used directly.
For example, the ROUND
function allows you to round numeric functions:
This page describes how to limit return and sample data in Kafka with Lenses SQL Studio.
To limit the output of the query you can use two approaches:
use the LIMIT
clause
set the max size of the data to be returned
To restrict the time to run the query, use SET max.query.time
:
To sample data and discard the first rows:
This statement instructs Lenses to skip the first record matched and then sample the next two.
This page describes joining data in Kafka with Lenses SQL Studio.
Lenses allows you to combine records from two tables. A query can contain zero, one or multiple JOIN operations.
Create an orders
table and insert some data into it:
With these tables in place, join them to get more information about an order by combining it with the customer information found in the customer
table:
With lateral joins, Lenses allows you to combine records from a table with the elements of an array expression.
We are going to see in more detail what lateral joins are with an example.
Create a batched_readings
table and insert some data into it:
You now can use a LATERAL join to inspect, extract and filter the single elements of the readings
array, as if they were a normal field:
Running that query we will get the values:
You can use multiple LATERAL
joins, one inside the other, if you want to extract elements from a nested array:
Running the following query we will obtain the same records of the previous example:
This page describes how to create and delete topics in the Lenses SQL Studio.
Lenses supports the typical SQL commands supported by a relational database:
CREATE
DROP
TRUNCATE
DELETE
SHOW TABLES
DESCRIBE TABLE
DESCRIBE FORMATTED
The CREATE
statement has the following parts:
CREATE TABLE - Instructs the construction of a table
$Table - The actual name given to the table.
Schema - Constructed as a list of (field, type) tuple, it describes the data each record in the table contains
FORMAT - Defines the storage format. Since it is an Apache Kafka topic, both the Key and the Value formats are required. Valid values are STRING, INT, LONG, JSON, AVRO.
PROPERTIES - Specifies the number of partitions the final Kafka topic should have, the replication factor in order to ensure high availability (it cannot be a number higher than the current Kafka Brokers number) and if the topic should be compacted.
A Kafka topic which is compacted is a special type of topic with a finer-grained retention mechanism that retains the last update record for each key.
A compacted topic (once the compaction has been completed) contains a full snapshot of the final record values for every record key and not just the recently changed keys. They are useful for in-memory services, persistent data stores, reloading caches, etc.
Example:
Best practices dictate to use Avro as a storage format over other formats. In this case, the key can still be stored as STRING but the value can be Avro.
To list all tables:
To examine the schema an metadata for a topic:
The $tableName
should contain the name of the table to describe.
Given the two tables created earlier, a user can run the following SQL to get the information on each table:
the following information will be displayed:
To drop a table:
Dropping a table results in the underlying Kafka topics being removed.
Lenses provides a set of virtual tables that contain information about all the fields in all the tables.
Using the virtual table, you can quickly search for a table name but also see the table type.
The __table
has a table_name
column containing the table name, and a table_type
column describing the table type (system, user, etc).
To see all the tables fields select from the _fields
virtual table.
Each Kafka message contains information related to partition, offset, timestamp, and topic. Additionally, the engine adds the key and value raw byte size.
Create a topic and insert a few entries.
Now we can query for specific metadata related to the records.
To query for metadata such as the underlying Kafka topic offset, partition and timestamp prefix your desired fields with _meta
.
Run the following query to see each tutorial name along with its metadata information:
Name | Description | Example |
---|---|---|
For a full list of functions see .
meter_id | reading |
---|
For more details on the subject, you should look at .
max.size
The maximum amount of Kafka data to scan. This is to avoid full topic scan over large topics. It can be expressed as bytes (1024), as kilo-bytes (1024k), as mega-bytes (10m) or as giga-bytes (5g). Default is 20MB.
SET max.size = '1g';
max.query.time
The maximum amount of time the query is allowed to run. It can be specified as milliseconds (2000ms), as hours (2h), minutes (10m) or seconds (60s). Default is 1 hour.
SET max.query.time = '60000ms';
max.idle.time
The amount of time to wait when no more records are read from the source before the query is completed. Default is 5 seconds
SET max.idle.time = '5s';
LIMIT N
The maximum of records to return. Default is 10000
SELECT * FROM payments LIMIT 100;
show.bad.records
Flag to drive the behavior of handling topic records when their payload does not correspond with the table storage format. Default is true. This means bad records are processed, and displayed seperately in the Bad Records section. Set it to false to fail to skip them completely.
SET show.bad.records=false;
format.timestamp
Flag to control the values for Avro date time. Avro encodes date time via Long values. Set the value to true if you want the values to be returned as text and in a human readable format.
SET format.timestamp=true;
format.decimal
Flag to control the formatting of decimal types. Use to specify how many decimal places are shown.
SET format.decimal= 2;
format.uppercase
Flag to control the formatting of string types. Use to specify if strings should all be made uppercase. Default is false.
SET format.decimal= 2;
live.aggs
Flag to control if aggregation queries should be allowed to run. Since they accumulate data they require more memory to retain the state.
SET live.aggs=true;
max.group.records
When an aggregation is calculated, this config is used to define the maximum number of records over which the engine is computed. Default is 10 000 000
SET max.group.records=10000000
optimize.kafka.partition
When enabled, it will use the primitive used for the _key filter to determine the partition the same way the default Kafka partitioner logic does. Therefore, queries like SELECT * FROM trips WHERE _key='customer_id_value';
on multiple partition topics will only read one partition as opposed to the entire topic. To disable it, set the flag to false.
SET optimize.kafka.partition=false;
query.parallel
When used, it will parallelize the query. The number provided will be capped by the target topic partitions count.
SET query.parallel=2;
query.buffer
Internal buffer when processing messages. Higher number might yield better performance when coupled with max.poll.records
.
SET query.buffer=50000;
kafka.offset.timeout
Timeout for retrieving target topic start/end offsets.
SET kafka.offset.timeout=20000;
1 | 100 |
1 | 95 |
1 | 91 |
2 | 93 |
1 | 92 |
1 | 94 |
This page describes how to aggregate Kafka data in Lenses SQL Studio.
For a full list of aggregation functions see the SQL Reference.
Using the COUNT
aggregate function you can count the records in a table. Run the following SQL to see how many records we have on the customers_partitioned
:
Using the SUM
function you can sum records in a table.
To group data use the GROUP BY
clause:
Let’s see how many customers there are from each country. Here is the code which computes that:
This page describes how to insert and delete data into Kafka with Lenses SQL Studio.
Lenses SQL allows you to utilize the ANSI SQL command to store new records into a table.
Single or multi-record inserts are supported:
$Table - The name of the table to insert the data into
Columns - The target columns to populate with data. Adding a record does not require you to fill all the available columns. In the case of Avro stored Key, Value pairs, the user needs to make sure that a value is specified for all the required Avro fields.
VALUES - The set of value to insert. It has to match the list of columns provided, including their data types. You can use simple constants or more complex expressions as values, like 1 + 1
or NOW()
.
Example:
Records can be inserted from the result of SELECT statement.
The syntax is:
For example, to copy all the records from the customer table into customer_avro one:
There are scenarios where a record key is a complex type. Regardless of the storage format, JSON or Avro, the SQL engine allows the insertion of such entries:
There are two ways to delete data:
If the topic is not compacted, then DELETE
expects an offset to delete records up to.
If the topic is compacted, then DELETE
expects the record Key
to be provided. For a compacted topic a delete translates to inserting a record with the existing Key, but the Value is null. For the customer_avro
topic (which has the compacted flag on), a delete operation for a specific customer identifier would look like this:
Deleting is an insert operation. Until the compaction takes place, there will be at least one record with the Key used earlier. The latest (or last) record will have the Value set to null.
To remove all records from a table:
where the $Table
is the table name to delete all records from. This operation is only supported on non-compacted topics, which is a Kafka design restriction. To remove the data from a compacted topic, you have two options: either dropping and recreating the topic or inserting null Value records for each unique Key on the topic.
After rebuilding the customer
table to be non-compacted, perform the truncate:
Truncating a compacted Kafka topic is not supported. This is an Apache Kafka restriction. You can drop and recreate the table, or insert a record with a null Value for each unique key in the topic.
This page describes how to use views and synonyms in Lenses SQL Studio to query Kafka.
Lenses supports the typical SQL commands supported by a relational database:
CREATE
DROP
TRUNCATE
DELETE
SHOW VIEWS
A view
is a virtual table, generated dynamically based on the results of a SELECT
statement.
A view looks and acts just like a real table, but is always created on the fly as required, so it is always up to date.
A synonym
is an alias for a table. This is useful if you have a topic with a long, unwieldy name like customer_reports_emea_april_2018_to_march_2018 and you want to access this as customer_reports.
To create a view:
Where viewname
is the name of the virtual table that is used to access the records in the view, and the query is a standard SELECT
statement.
Then we can query the view:
A view acts as a virtual table. This means that a view can be filtered even more or that a projection can be applied to a view:
To delete a view:
If you wish to modify an existing view, use the syntax above to delete it, and then create a new view with the same name.
To see a definition of a view. You can use the following syntax:
To create a synonym:
To delete a synonym:
If you wish to modify an existing synonym, use the syntax above to delete it, and then create a new synonym with the same name.
Three common reasons for using a view are:
creating a projection from a table with a large number of fields
representing joins as a single table
and creating a preset filter
We will cover each scenario with an example.
If we have a table called customers which contains full customer records - name, email, age, registration date, country, password, and many others – and we find ourselves repeatedly querying it for just name and email.
A view could be created that returns just the name and email as a projection.
There is no reason to specify the projection each time.
The benefit is more significant when we want to select a higher number of fields - say a topic with 50 fields, and we want to select only 15.
The statement that is used to generate the view can consist of one or more tables. One use case of views is to represent joined tables as if they were a single table. This avoids the need for writing a complex join query each time.
Then we can select from this join like this:
Finally, another use case is to define a filter that is commonly used. If a topic contains transactions, and we often found ourselves searching for transactions from the UK. We could run this query each time:
Alternatively, we can set up a view with this filter pre-applied:
Then use a SELECT
query:
This page describes examples of using arrays in Lenses SQL Studio to query Kafka.
For a full list of array functions see the SQL Reference.
You can create array fields using the ..[]
syntax:
Tables can store data containing arrays. Here is a SQL statement for querying an array item:
When working with arrays is good to check the array bounds. See the SIZEOF
function in the list of supported functions.
Sometimes you want to find out how many items are in your array. To do so you can run:
This page describes how to manage and control queries against Kafka in Lenses SQL Studio.
Adding a LIMIT 10 in the SQL query will result in the SQL terminating early, as soon as 10 x messages have been discovered. It’s not a perfect solution as we might never find 10 x messages, and thus perform a full scan.
You can also set a maximum query or idle time:
or max idle time, the idea is that there is no reason to keep polling if we have exhausted the entire topic:
or a maximum amount of data to read from Kafka. This controls how much data to read from Kafka NOT the required memory.
Recent queries are displayed, but only for the current session, they are not currently retained.
Click on the play button to run a previous query. If a query is already running, you will be asked if you want to stop it first.
View All queries
View Running queries
You can see all running queries by Lenses users using SQL:
You can force stop a query by another user using SQL:
This page describes access Kafka message metadata in Lenses SQL Studio.
When running queries against Kafka, Snapshot Engine enables you to access the record metadata through the special _meta
facet.
These are the available meta fields:
Field | Description |
---|---|
The following query will select all the meta fields listed above:
To view the value of a specific header you can run:
To read records from a specific partition, the following query can be used:
Here is the query to use when the record offset and partition are known:
This query will get the latest 100 records per partition (assuming the topic is not compacted):
This instead will get the latest 100 records for a given partition (again assuming the topic is not compacted):
_meta.offset
The offset of the record in its Kafka topic partition
_meta.partition
The Kafka topic partition of the record
_meta.timestamp
The Kafka record timestamp
_meta.__keysize
The length in bytes of the raw key stored in Kafka
_meta.__valuesize
The length in bytes of the raw value stored in Kafka