This page describes common filtering of data in Kafka with Lenses SQL Studio.
WHERE clause allows you to define a set of logical predicates the data needs to match in order to be returned. Standard comparison operators are supported (>, >=, <, <=, =, and !=) as well as calling functions.
We are going to use the groceries table created earlier. Select all items purchased where the prices are greater or equal to 2.00:
Select all customers whose last name length equals to 5:
Search all customers containing Ana in their first name:
Keep in mind that text search is case-sensitive. To use case insensitive text search, you can write:
Sometimes data can contain explicit NULL values, or it can omit fields entirely. Using IS [ NOT ] NULL, or EXISTS functions allows you to check for these situations.
Exists is a keyword in Lenses SQL grammar so it needs to be escaped, the escape character is `````.
Lenses supports JSON. JSON does not enforce a schema allowing you to insert null values.
Create the following table named customers_json:
Query this table for all its entries:
The middle_name is only present on the mikejones record.
Write a query which filters out records where middle_name is not present:
This can also be written as:
When a field is actually NULL or is missing, checking like in the above query has the same outcome.
You can use AND/OR to specify complex conditions for filtering your data.
To filter the purchased items where more than one item has been bought for a given product, and the unit price is greater than 2:
Now try changing the AND logical operand to OR and see the differences in output.
To filter the entries returned from a grouping query. As with the WHERE statement, you can use HAVING syntax to achieve the same result when it comes to grouped queries.
To select data from a specific partition access the metadata of the topic.
In the following example, a table is created with three partitions and the message key is hashed and then the remainder HashValue % partitions will be the table partition the record is sent to.
Next, run the following query:
As you can see from the results (your timestamps will be different) the records span over the three partitions. Now query specific partitions:
Kafka reads are non-deterministic over multiple partitions. The Snapshot engine may reach its max.size before it finds your record in one run, next time it might.
If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.
If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.
The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.Time-traveling
The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.
SELECT
name
, price
FROM groceries
WHERE price >= 2.0
/* Output
Meridian Crunchy Peanut Butter 2.5
Activia fat free cherry yogurts 2
Green & Blacks Organic Chocolate Ice Cream 4.2
*/SELECT *
FROM customers
WHERE LEN(last_name) = 5
/* Output
key value.first_name value.last_name
mikejones Mike Jones
anasmith Ana Smith
*/SELECT *
FROM customers
WHERE first_name LIKE '%Ana%'
SELECT *
FROM customers
WHERE LOWERCASE(first_name) LIKE '%ana%';
-- And here is the negated version
SELECT *
FROM customers
WHERE LOWERCASE(first_name) NOT LIKE '%ana%';CREATE TABLE customers_json (
_key STRING
, first_name STRING
, last_name STRING
, middle_name STRING
) FORMAT(string, json);
INSERT INTO customers_json(_key, first_name, last_name, middle_name) VALUES("mikejones", "Mike", "Jones", "Albert");
INSERT INTO customers_json(_key, first_name, last_name) VALUES("anasmith", "Ana", "Smith");
INSERT INTO customers_json(_key, first_name, last_name) VALUES("shannonelliott", "Shannon","Elliott");
SELECT *
FROM customers_json
/* The output
key value.first_name value.middle_name value.last_name
mikejones Mike Albert Jones
anasmith Ana Smith
shannonelliott Shannon Elliott
*/
SELECT *
FROM customers_json
WHERE `EXISTS`(middle_name)
/* The output
key value.first_name value.middle_name value.last_name
mikejones Mike Albert Jones
*/SELECT *
FROM customers_json
WHERE middle_name IS NULLSELECT *
FROM groceries
WHERE quantity > 1
AND price > 2
SELECT
COUNT(*) AS count
, country
FROM customer
GROUP BY country
HAVING count > 1-- Run
CREATE TABLE customers_partitioned (
_key STRING
, first_name STRING
, last_name STRING
)
FORMAT(string, Avro)
properties(partitions = 3);
INSERT INTO customers_partitioned(
_key
, first_name
, last_name)
VALUES
("mikejones", "Mike", "Jones"),
("anasmith", "Ana", "Smith"),
("shannonelliott", "Shannon","Elliott"),
("tomwood", "Tom","Wood"),
("adelewatson", "Adele","Watson"),
("mariasanchez", "Maria", "Sanchez");SELECT *
FROM customers_partitioned
/* The output
offset partition timestamp key value.first_name value.last_name
0 0 1540830780401 mikejones Mike Jones
1 0 1540830780441 anasmith Ana Smith
2 0 1540830780464 shannonelliott Shannon Elliott
0 2 1540831270170 mariasanchez Maria Sanchez
0 1 1540830984698 tomwood Tom Wood
1 1 1540831183308 adelewatson Adele Watson
*/-- selects only records from partition = 0
SELECT *
FROM customers_partitioned
WHERE _meta.partition = 0;
-- selects only records from partition 0 and 2
SELECT *
FROM customers_partitioned
WHERE _meta.partition = 0
OR _meta.partition = 2;SELECT *
FROM topicA
WHERE transaction_id=123
AND _meta.partition = 1SELECT *
FROM topicA
WHERE transaction_id=123
AND _meta.offset > 100
AND _meta.offset < 100100
AND _meta.partition = 1SELECT *
FROM topicA
WHERE transaction_id=123
AND _meta.timestamp > NOW() - "1H"SELECT *
FROM position_reports
WHERE
_meta.timestamp > "2020-04-01" AND
_meta.timestamp < "2020-04-02"