All pages
Powered by GitBook
1 of 1

Loading...

Filtering

This page describes common filtering of data in Kafka with Lenses SQL Studio.

WHERE clause

WHERE clause allows you to define a set of logical predicates the data needs to match in order to be returned. Standard comparison operators are supported (>, >=, <, <=, =, and !=) as well as calling functions.

We are going to use the groceries table created earlier. Select all items purchased where the prices are greater or equal to 2.00:

Select all customers whose last name length equals to 5:

Search all customers containing Ana in their first name:

Keep in mind that text search is case-sensitive. To use case insensitive text search, you can write:

Missing values

Sometimes data can contain explicit NULL values, or it can omit fields entirely. Using IS [ NOT ] NULL, or EXISTS functions allows you to check for these situations.

Exists is a keyword in Lenses SQL grammar so it needs to be escaped, the escape character is `````.

Lenses supports JSON. JSON does not enforce a schema allowing you to insert null values.

Create the following table named customers_json:

Query this table for all its entries:

The middle_name is only present on the mikejones record.

Write a query which filters out records where middle_name is not present:

This can also be written as:

When a field is actually NULL or is missing, checking like in the above query has the same outcome.

Multiple WHERE conditions

You can use AND/OR to specify complex conditions for filtering your data.

To filter the purchased items where more than one item has been bought for a given product, and the unit price is greater than 2:

Now try changing the AND logical operand to OR and see the differences in output.

HAVING clause

To filter the entries returned from a grouping query. As with the WHERE statement, you can use HAVING syntax to achieve the same result when it comes to grouped queries.

Read a table partition only

To select data from a specific partition access the metadata of the topic.

In the following example, a table is created with three partitions and the message key is hashed and then the remainder HashValue % partitions will be the table partition the record is sent to.

Next, run the following query:

As you can see from the results (your timestamps will be different) the records span over the three partitions. Now query specific partitions:

Query by partitions

Kafka reads are non-deterministic over multiple partitions. The Snapshot engine may reach its max.size before it finds your record in one run, next time it might.

If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.

Query by offsets

If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.

Query by timestamp

The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.Time-traveling

The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.

SELECT
     name
     , price
FROM groceries
WHERE price >= 2.0

/* Output
Meridian Crunchy Peanut Butter              2.5
Activia fat free cherry yogurts             2
Green & Blacks Organic Chocolate Ice Cream  4.2
*/
SELECT *
FROM customers
WHERE LEN(last_name) = 5

/* Output
key         value.first_name    value.last_name
mikejones       Mike                Jones
anasmith        Ana                 Smith
*/
SELECT *
FROM customers
WHERE first_name LIKE '%Ana%'
SELECT *
FROM customers
WHERE LOWERCASE(first_name) LIKE '%ana%';

-- And here is the negated version
SELECT *
FROM customers
WHERE LOWERCASE(first_name) NOT LIKE '%ana%';
CREATE TABLE customers_json (
    _key STRING
    , first_name STRING
    , last_name STRING
    , middle_name STRING
) FORMAT(string, json);


INSERT INTO customers_json(_key, first_name, last_name, middle_name) VALUES("mikejones", "Mike", "Jones", "Albert");
INSERT INTO customers_json(_key, first_name, last_name) VALUES("anasmith", "Ana", "Smith");
INSERT INTO customers_json(_key, first_name, last_name) VALUES("shannonelliott", "Shannon","Elliott");
SELECT * 
FROM customers_json

/* The output
key             value.first_name   value.middle_name   value.last_name
mikejones           Mike                Albert          Jones
anasmith            Ana                                 Smith
shannonelliott      Shannon                             Elliott
*/

SELECT *
FROM customers_json
WHERE `EXISTS`(middle_name)

/* The output
 key             value.first_name   value.middle_name   value.last_name
mikejones            Mike            Albert                Jones
*/
SELECT *
FROM customers_json
WHERE middle_name IS NULL
SELECT *
FROM groceries
WHERE quantity > 1 
    AND price > 2
SELECT
    COUNT(*) AS count
    , country
FROM customer
GROUP BY country
HAVING count > 1
-- Run
CREATE TABLE customers_partitioned (
    _key STRING
    , first_name STRING
    , last_name STRING
) 
FORMAT(string, Avro)
properties(partitions = 3);

INSERT INTO customers_partitioned(
    _key
    , first_name
    , last_name)
VALUES
("mikejones", "Mike", "Jones"),
("anasmith", "Ana", "Smith"),
("shannonelliott", "Shannon","Elliott"),
("tomwood", "Tom","Wood"),
("adelewatson", "Adele","Watson"),
("mariasanchez", "Maria", "Sanchez");
SELECT *
FROM customers_partitioned

/* The output
offset  partition   timestamp       key         value.first_name    value.last_name
0       0           1540830780401   mikejones       Mike                Jones
1       0           1540830780441   anasmith        Ana                 Smith
2       0           1540830780464   shannonelliott  Shannon             Elliott
0       2           1540831270170   mariasanchez    Maria               Sanchez
0       1           1540830984698   tomwood         Tom                 Wood
1       1           1540831183308   adelewatson     Adele               Watson
*/
-- selects only records from partition = 0
SELECT *
FROM customers_partitioned
WHERE _meta.partition = 0;

-- selects only records from partition  0 and 2
SELECT *
FROM customers_partitioned
WHERE _meta.partition = 0
   OR _meta.partition = 2;
SELECT *
FROM topicA
WHERE transaction_id=123
   AND _meta.partition = 1
SELECT *
FROM topicA
WHERE transaction_id=123
  AND _meta.offset > 100
  AND _meta.offset < 100100
  AND _meta.partition = 1
SELECT *
FROM topicA
WHERE transaction_id=123
  AND _meta.timestamp > NOW() - "1H"
SELECT *
FROM position_reports
WHERE
   _meta.timestamp > "2020-04-01" AND
   _meta.timestamp < "2020-04-02"