Filtering

This page describes common filtering of data in Kafka with Lenses SQL Studio.

WHERE clause

WHERE clause allows you to define a set of logical predicates the data needs to match in order to be returned. Standard comparison operators are supported (>, >=, <, <=, =, and !=) as well as calling functions.

We are going to use the groceries table created earlier. Select all items purchased where the prices are greater or equal to 2.00:

SELECT
     name
     , price
FROM groceries
WHERE price >= 2.0

/* Output
Meridian Crunchy Peanut Butter              2.5
Activia fat free cherry yogurts             2
Green & Blacks Organic Chocolate Ice Cream  4.2
*/

Select all customers whose last name length equals to 5:

SELECT *
FROM customers
WHERE LEN(last_name) = 5

/* Output
key         value.first_name    value.last_name
mikejones       Mike                Jones
anasmith        Ana                 Smith
*/

Search all customers containing Ana in their first name:

SELECT *
FROM customers
WHERE first_name LIKE '%Ana%'

Keep in mind that text search is case-sensitive. To use case insensitive text search, you can write:

SELECT *
FROM customers
WHERE LOWERCASE(first_name) LIKE '%ana%';

-- And here is the negated version
SELECT *
FROM customers
WHERE LOWERCASE(first_name) NOT LIKE '%ana%';

Missing values

Sometimes data can contain explicit NULL values, or it can omit fields entirely. Using IS [ NOT ] NULL, or EXISTS functions allows you to check for these situations.

Exists is a keyword in Lenses SQL grammar so it needs to be escaped, the escape character is `````.

Lenses supports JSON. JSON does not enforce a schema allowing you to insert null values.

Create the following table named customers_json:

CREATE TABLE customers_json (
    _key STRING
    , first_name STRING
    , last_name STRING
    , middle_name STRING
) FORMAT(string, json);


INSERT INTO customers_json(_key, first_name, last_name, middle_name) VALUES("mikejones", "Mike", "Jones", "Albert");
INSERT INTO customers_json(_key, first_name, last_name) VALUES("anasmith", "Ana", "Smith");
INSERT INTO customers_json(_key, first_name, last_name) VALUES("shannonelliott", "Shannon","Elliott");

Query this table for all its entries:

SELECT * 
FROM customers_json

/* The output
key             value.first_name   value.middle_name   value.last_name
mikejones           Mike                Albert          Jones
anasmith            Ana                                 Smith
shannonelliott      Shannon                             Elliott
*/

The middle_name is only present on the mikejones record.

Write a query which filters out records where middle_name is not present:


SELECT *
FROM customers_json
WHERE `EXISTS`(middle_name)

/* The output
 key             value.first_name   value.middle_name   value.last_name
mikejones            Mike            Albert                Jones
*/

This can also be written as:

SELECT *
FROM customers_json
WHERE middle_name IS NULL

When a field is actually NULL or is missing, checking like in the above query has the same outcome.

Multiple WHERE conditions

You can use AND/OR to specify complex conditions for filtering your data.

To filter the purchased items where more than one item has been bought for a given product, and the unit price is greater than 2:

SELECT *
FROM groceries
WHERE quantity > 1 
    AND price > 2

Now try changing the AND logical operand to OR and see the differences in output.

HAVING clause

To filter the entries returned from a grouping query. As with the WHERE statement, you can use HAVING syntax to achieve the same result when it comes to grouped queries.

SELECT
    COUNT(*) AS count
    , country
FROM customer
GROUP BY country
HAVING count > 1

Read a table partition only

To select data from a specific partition access the metadata of the topic.

In the following example, a table is created with three partitions and the message key is hashed and then the remainder HashValue % partitions will be the table partition the record is sent to.

-- Run
CREATE TABLE customers_partitioned (
    _key STRING
    , first_name STRING
    , last_name STRING
) 
FORMAT(string, Avro)
properties(partitions = 3);

INSERT INTO customers_partitioned(
    _key
    , first_name
    , last_name)
VALUES
("mikejones", "Mike", "Jones"),
("anasmith", "Ana", "Smith"),
("shannonelliott", "Shannon","Elliott"),
("tomwood", "Tom","Wood"),
("adelewatson", "Adele","Watson"),
("mariasanchez", "Maria", "Sanchez");

Next, run the following query:

SELECT *
FROM customers_partitioned

/* The output
offset  partition   timestamp       key         value.first_name    value.last_name
0       0           1540830780401   mikejones       Mike                Jones
1       0           1540830780441   anasmith        Ana                 Smith
2       0           1540830780464   shannonelliott  Shannon             Elliott
0       2           1540831270170   mariasanchez    Maria               Sanchez
0       1           1540830984698   tomwood         Tom                 Wood
1       1           1540831183308   adelewatson     Adele               Watson
*/

As you can see from the results (your timestamps will be different) the records span over the three partitions. Now query specific partitions:

-- selects only records from partition = 0
SELECT *
FROM customers_partitioned
WHERE _meta.partition = 0;

-- selects only records from partition  0 and 2
SELECT *
FROM customers_partitioned
WHERE _meta.partition = 0
   OR _meta.partition = 2;

Query by partitions

Kafka reads are non-deterministic over multiple partitions. The Snapshot engine may reach its max.size before it finds your record in one run, next time it might.

SELECT *
FROM topicA
WHERE transaction_id=123
   AND _meta.partition = 1

If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.

Query by offsets

SELECT *
FROM topicA
WHERE transaction_id=123
  AND _meta.offset > 100
  AND _meta.offset < 100100
  AND _meta.partition = 1

If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.

Query by timestamp

SELECT *
FROM topicA
WHERE transaction_id=123
  AND _meta.timestamp > NOW() - "1H"

The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.Time-traveling

SELECT *
FROM position_reports
WHERE
   _meta.timestamp > "2020-04-01" AND
   _meta.timestamp < "2020-04-02"

The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.