This page describes a tutorial to filter records in a Kafka topic with Lenses SQL Processors.
Filtering messages and copying them to a topic can be achieved using the WHERE clause.
In our example, we have a topic where our application registers bank transactions.
We have a topic called payments
where records have this shape:
We can replicate such a structure running the following query in SQL Studio:
Each event has a unique string key generated by the upstream system.
We can again use SQL Studio to insert some data to play wi
Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.
We want to copy those transactions into a new topic, maintaining the content of the records as it is.
For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.
Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE
statement with a >=
condition:
Checking the records emitted by the processor, we see that we got the transactions we were looking for.
Because of the *
projection, records content has not changed.
Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with AND
s and OR
s:
As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE
statement for that:
getting the results:
In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions.
You will also see how to use a CAST
expression to convert from one type to another.
Checking the output, we can see that only one transaction satisfied our predicate:
Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.
We decided then to build a reduced copy of the payments topic, with only 10% of the original records.
To do that we are going to use our RANDINT
function:
RANDINT
generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.
We have to CAST
to double
on the way; otherwise, the division would be between integers, and the result would always be 0
.
This page describes a tutorial to enrich a Kafka topic using Lenses SQL Processors.
In this article, we will be enriching customer call events with their customer details.
Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.
Topics involved:
customer_details
messages contain information about the customer
customer_call_details
messages contain information about calls
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
customer_details
customer_details
customer_call_details
customer_call_details
This page describes a tutorial joining Kafka topics with Lenses SQL Processors.
magine you are the next Amazon, and you want to track the orders and shipment events to work out which orders have been shipped and how long it took. In this case, there will be two data streams, one for each event type, and the resulting stream will answer the questions above.
Enriching two streams of data requires a sliding window join. The events are said to be “close” to each other, if the difference between their timestamp is up to the time window specified.
Topics involved:
orders
messages contain information about a customer
shipments
messages contain information about the shipment
The query combines the data from orders and shipments if the orders are processed within 24 hours. Resulting records contain the order and shipment identifier, and the time between the order was registered to the time it was shipped.
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
orders
orders
shipments
shipments
The output seen in the next screenshot shows two records. For the order with o2
identifier, there is no shipments
entry because it has not been processed. For the order with identifier o3
, the shipment happened after one day.
Let’s switch to the Snapshot engine by navigating to SQL Studio
menu item. With the entries in both topics, we can write the following query to see which data is joinable without the window interval:
These are the results for the non-streaming query (i.e., Snapshot)
Running the query returned three records. But you can see the order o3
was processed two days after it was placed. Let’s apply the sliding window restriction for the Snapshot query by adding a filter to only match those records having their timestamp difference within a day.
Now the result matches the one from Streaming query.
In this tutorial you learned how to join to Streams together using a sliding window. You achieved all the above using Lenses SQL engine.
Good luck and happy streaming!
This page describes a tutorial to use multiple Kafka topics in a Lenses SQL Processor.
In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.
Let’s assume that we have a topic (game-sessions
) that contains data regarding remote gaming sessions by users.
Each gaming session will contain:
the points the user achieved throughout the session
Metadata information regarding the session:
The country where the game took place
The language the user played the game in
The above structure represents the value
of each record in our game-sessions
topic.
Additionally, each record will is keyed by user details.
A pid, or player id, representing this user uniquely
Some additional denormalised user details:
a name
a surname
an age
In light of the above, a record might look like the following (in JSON for simplicity):
Finally, let’s assume we also have another, normalised, compacted topic user-details
, keyed by an int
matching the pid
from topic game-sessions
and containing user information like address and period of membership to the platform.
In light of the above, a record might look like the following (in JSON for simplicity):
We can replicate such structures using SQL Studio and the following query:
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
Let’s imagine that, given the above data, we are given the following requirements:
For each country in the games-sessions
, create a record with the count of games played in from that country. Write the results to the games-per-country
topic.
For each record in the games-sessions
, reshape the records to remove everything from the key beside pid
. Additionally, add the user’s memberYears
to the value. Write the results to the games-sessions-normalised
topic .
We can obtain the above with the following query:
The result of this processor in the UI will be a processor graph similar to the following:
Finally, the content of the output topics games-per-country
and games-sessions-normalised
can now be inspected in the Lenses Explore screen:
In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.
Good luck and happy streaming!