This page describes a tutorial to enrich a Kafka topic using Lenses SQL Processors.
In this article, we will be enriching customer call events with their customer details.
Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.
Topics involved:
customer_details messages contain information about the customer
customer_call_details messages contain information about calls
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
customer_detailscustomer_detailscustomer_call_detailscustomer_call_detailsSET defaults.topic.autocreate=true;
INSERT INTO customers_callInfo
SELECT STREAM
calls._value AS call
, customer._value AS customer
FROM customer_call_details AS calls
INNER JOIN (SELECT TABLE * FROM customer_details) AS customer
ON customer._key.customer.id = calls._key.customer.idCREATE TABLE customer_details(
_key.customer.typeID string
, _key.customer.id string
, customer.name string
, customer.middleName string null
, customer.surname string
, customer.nationality string
, customer.passportNumber string
, customer.phoneNumber string
, customer.email string null
, customer.address string
, customer.country string
, customer.driverLicense string null
, package.typeID string
, package.description string
, active boolean
)
FORMAT(avro, avro)
PROPERTIES(partitions=5, replication=1, compacted=true);INSERT INTO customer_details(
_key.customer.typeID
, _key.customer.id
, customer.name
, customer.middleName
, customer.surname
, customer.nationality
, customer.passportNumber
, customer.phoneNumber
, customer.email
, customer.address
, customer.country
, customer.driverLicense
, package.typeID
, package.description
, active
) VALUES
("userType1","5162258362252394","April","-","Paschall","GBR","APGBR...","1999153354","[email protected]","-","GBR","-","TypeA","Desc.",true),
("internal","5290441401157247","Charisse","-","Daggett","USA","CDUSA...","6418577217","[email protected]","-","USA","-","TypeC","Desc.",true),
("internal","5397076989446422","Gibson","-","Chunn","USA","GCUSA...","8978860472","[email protected]","-","USA","-","TypeC","Desc.",true),
("partner","5248189647994492","Hector","-","Swinson","NOR","HSNOR...","8207437436","[email protected]","-","NOR","-","TypeA","Desc.",true),
("userType1","5196864976665762","Booth","-","Spiess","CAN","BSCAN...","6220504387","[email protected]","-","CAN","-","TypeA","Desc.",true),
("userType2","5423023313257503","Hitendra","-","Sibert","SWZ","HSSWZ...","6731834082","[email protected]","-","SWZ","-","TypeA","Desc.",true),
("userType2","5337899393425317","Larson","-","Asbell","SWE","LASWE...","2844252229","[email protected]","-","SWE","-","TypeA","Desc.",true),
("partner","5140590381876333","Zechariah","-","Schwarz","GER","ZSGER...","4936431929","[email protected]","-","GER","-","TypeB","Desc.",true),
("internal","5524874546065610","Shulamith","-","Earles","FRA","SEFRA...","2119087327","[email protected]","-","FRA","-","TypeC","Desc.",true),
("userType1","5204216758311612","Tangwyn","-","Gorden","GBR","TGGBR...","9172511192","[email protected]","-","GBR","-","TypeA","Desc.",true),
("userType1","5336077954566768","Miguel","-","Gonzales","ESP","MGESP...","5664871802","[email protected]","-","ESP","-","TypeA","Desc.",true),
("userType3","5125835811760048","Randie","-","Ritz","NOR","RRNOR...","3245795477","[email protected]","-","NOR","-","TypeA","Desc.",true),
("userType1","5317812241111538","Michelle","-","Fleur","FRA","MFFRA...","7708177986","[email protected]","-","FRA","-","TypeA","Desc.",true),
("userType1","5373595752176476","Thurborn","-","Asbell","GBR","TAGBR...","5927996719","[email protected]","-","GBR","-","TypeA","Desc.",true),
("userType3","5589753170506689","Noni","-","Gorden","AUT","NGAUT...","7288041910","[email protected]","-","AUT","-","TypeA","Desc.",true),
("userType2","5588152341005179","Vivian","-","Glowacki","POL","VGPOL...","9001088901","[email protected]","-","POL","-","TypeA","Desc.",true),
("partner","5390713494347532","Elward","-","Frady","USA","EFUSA...","2407143487","[email protected]","-","USA","-","TypeB","Desc.",true),
("userType1","5322449980897580","Severina","-","Bracken","AUT","SBAUT...","7552231346","[email protected]","-","AUT","-","TypeA","Desc.",true);CREATE TABLE customer_call_details(
_key.customer.typeID string
, _key.customer.id string
, callInfoCustomerID string
, callInfoType string
, callInfoDuration int
, callInfoInit int)
FORMAT(avro, avro)
PROPERTIES(partitions=1, replication=1, compacted=false)INSERT INTO customer_call_details(
_key.customer.typeID
, _key.customer.id
, callInfoCustomerID
, callInfoType
, callInfoDuration
, callInfoInit
) VALUES
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 470, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 67, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 377, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 209, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 209, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 887, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 203, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 1698, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 320, 1),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 89, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 355, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 65, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 43, 1),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 530, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 270, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1633, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 110, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 540, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 168, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1200, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 1200, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 22, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 333, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 87, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 123, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 182, 1),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 844, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 56, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 36, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 794, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 440, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 52, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 770, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 627, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 555, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 55, 1);SELECT
p.callInfoCustomerID AS customerID
, p.callInfoType
, p.callInfoInit
FROM customer_call_details AS p
INNER JOIN customer_details AS c
ON p._key.customer.id = c._key.customer.idThis page describes a tutorial to use multiple Kafka topics in a Lenses SQL Processor.
In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.
Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.
Each gaming session will contain:
the points the user achieved throughout the session
Metadata information regarding the session:
The country where the game took place
The language the user played the game in
The above structure represents the value of each record in our game-sessions topic.
Additionally, each record will is keyed by user details.
A pid, or player id, representing this user uniquely
Some additional denormalised user details:
a name
a surname
In light of the above, a record might look like the following (in JSON for simplicity):
Finally, let’s assume we also have another, normalised, compacted topic user-details, keyed by an int matching the pid from topic game-sessions and containing user information like address and period of membership to the platform.
In light of the above, a record might look like the following (in JSON for simplicity):
We can replicate such structures using SQL Studio and the following query:
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
Let’s imagine that, given the above data, we are given the following requirements:
For each country in the games-sessions, create a record with the count of games played in from that country. Write the results to the games-per-country topic.
For each record in the games-sessions, reshape the records to remove everything from the key beside pid. Additionally, add the user’s memberYears to the value. Write the results to the games-sessions-normalised topic .
We can obtain the above with the following query:
The result of this processor in the UI will be a processor graph similar to the following:
Finally, the content of the output topics games-per-country and games-sessions-normalised can now be inspected in the Lenses Explore screen:
In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.
Good luck and happy streaming!
This page describes a tutorial joining Kafka topics with Lenses SQL Processors.
magine you are the next Amazon, and you want to track the orders and shipment events to work out which orders have been shipped and how long it took. In this case, there will be two data streams, one for each event type, and the resulting stream will answer the questions above.
Enriching two streams of data requires a sliding window join. The events are said to be “close” to each other, if the difference between their timestamp is up to the time window specified.
Topics involved:
orders messages contain information about a customer
shipments messages contain information about the shipment
The query combines the data from orders and shipments if the orders are processed within 24 hours. Resulting records contain the order and shipment identifier, and the time between the order was registered to the time it was shipped.
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
ordersordersshipmentsshipmentsThe output seen in the next screenshot shows two records. For the order with o2 identifier, there is no shipments entry because it has not been processed. For the order with identifier o3, the shipment happened after one day.
Let’s switch to the Snapshot engine by navigating to SQL Studio menu item. With the entries in both topics, we can write the following query to see which data is joinable without the window interval:
These are the results for the non-streaming query (i.e., Snapshot)
Running the query returned three records. But you can see the order o3 was processed two days after it was placed. Let’s apply the sliding window restriction for the Snapshot query by adding a filter to only match those records having their timestamp difference within a day.
Now the result matches the one from Streaming query.
In this tutorial you learned how to join to Streams together using a sliding window. You achieved all the above using Lenses SQL engine.
Good luck and happy streaming!
an age
{
"key":{
"pid": 1,
"name": "Billy",
"surname": "Lagrange",
"age": 30
},
"value":{
"points": 5,
"sessionMetadata": {
"country": "Italy",
"language": "IT"
}
}
}{
"key": 1,
"value":{
"fullName": "Billy Lagrange",
"memberYears": 3,
"address": {
"country": "Italy",
"street": "Viale Monza 5",
"city": "Milan"
}
}
}CREATE TABLE game-sessions(
_key.pid int
, _key.name string
, _key.surname string
, _key.age int
, points double
, sessionMetadata.country string
, sessionMetadata.language string
)
FORMAT (avro, avro);
CREATE TABLE user-details(
fullName string
, memberYears int
, address.country string
, address.street string
, address.city string
) FORMAT (int, avro);INSERT into game-sessions(
_key.pid
, _key.name
, _key.surname
, _key.age
, points
, sessionMetadata.country
, sessionMetadata.language
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
;
INSERT into user-details(
_key
, fullName
, memberYears
, address.country
, address.street
, address.city
) VALUES
(1, 'Billy Lagrange', 3, 'Italy', 'Viale Monza 5', 'Milan'),
(2, 'Maria Rossi', 1, 'Italy', 'Stazione Termini', 'Rome'),
(3, 'Jorge Escudero', 5, 'Spain', '50 Passeig de Gracia', 'Barcelona'),
(4, 'Juan Suarez', 0, 'Mexico', 'Plaza Real', 'Tijuana'),
(5, 'John Bolden', 2, 'USA', '100 Wall Street', 'New Work'),
(6, 'Dave Holden', 1, 'UK', '25 Bishopsgate', 'London'),
(7, 'Nigel Calling', 6, 'UK', '10 Queen Anne Street', 'Brighton')
;SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';
WITH userDetailsTable AS(
SELECT TABLE *
FROM user-details
);
WITH joinedAndNormalised AS(
SELECT STREAM
gs.*
, ud.memberYears
FROM game-sessions AS gs JOIN userDetailsTable AS ud
ON (gs._key.pid = ud._key)
);
INSERT INTO games-per-country
SELECT STREAM
COUNT(*) AS gamesPlayed
FROM game-sessions
GROUP BY sessionMetadata.country;
INSERT INTO games-sessions-normalised
SELECT STREAM *
FROM joinedAndNormalised;SET defaults.topic.autocreate=true;
SET auto.offset.reset = 'earliest';
WITH o as (
SELECT STREAM *
FROM orders
EVENTTIME BY orderTimestamp
);
WITH s as (
SELECT STREAM *
FROM shipments
EVENTTIME BY timestamp
);
INSERT INTO orders_and_shipment
SELECT STREAM
o._key AS orderId
, s._key AS shipmentId
, DATE_TO_STR(TO_TIMESTAMP(s.timestamp - o.orderTimestamp), 'HH:mm:ss') AS time_difference
FROM o INNER JOIN s
ON o._key = s.orderId
WITHIN 24hCREATE TABLE orders(
_key string
, customerId string
, orderTimestamp long
, amount decimal(18,2)
)
FORMAT(string, avro)INSERT INTO orders(
_key
, customerId
, orderTimestamp
, amount
) VALUES
("o1","[email protected]",1596374427000,99.9), --Sunday, 2 August 2020 13:20:27
("o2","[email protected]",1596453627000,240.01), --Monday, 3 August 2020 11:20:27
("o3","[email protected]",1596280827000,81.81), --Saturday, 1 August 2020 11:20:27
("o4","[email protected]",1596453627000,300.00); --Monday, 3 August 2020 11:20:27CREATE TABLE shipments(
_key string
, orderId string
, timestamp long)
FORMAT(string, avro)INSERT INTO shipments(
_key
, orderId
, `timestamp`
) VALUES
("s1", "o1", 1596445927000), --Monday, 3 August 2020 09:12:07
("s2", "o3", 1596456112000), --Monday, 3 August 2020 12:01:52
("s3", "o4", 1596460271000); --Monday, 3 August 2020 13:11:11SELECT
o._key AS order
, DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
, s._key AS shipment
, DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN shipments s
ON o._key = s.orderIdSELECT
o._key AS order
, DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
, s._key AS shipment
, DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN shipments s
ON o._key = s.orderId
WHERE to_timestamp(s.timestamp) - '1d' <= to_timestamp(o.orderTimestamp)




This page describes a tutorial to filter records in a Kafka topic with Lenses SQL Processors.
Filtering messages and copying them to a topic can be achieved using the WHERE clause.
In our example, we have a topic where our application registers bank transactions.
We have a topic called payments where records have this shape:
We can replicate such a structure running the following query in SQL Studio:
Each event has a unique string key generated by the upstream system.
We can again use SQL Studio to insert some data to play wi
Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.
We want to copy those transactions into a new topic, maintaining the content of the records as it is.
For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.
Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE statement with a >= condition:
Checking the records emitted by the processor, we see that we got the transactions we were looking for.
Because of the * projection, records content has not changed.
Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with ANDs and ORs:
As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE statement for that:
getting the results:
In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions.
You will also see how to use a CAST expression to convert from one type to another.
Checking the output, we can see that only one transaction satisfied our predicate:
Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.
We decided then to build a reduced copy of the payments topic, with only 10% of the original records.
To do that we are going to use our RANDINT function:
RANDINT generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.
We have to CAST to double on the way; otherwise, the division would be between integers, and the result would always be 0.
KEY: "6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: {
"amount": "12.10",
"currency": "EUR",
"from_account": "xxx",
"to_account": "yyy",
"time": 1591970345000
}CREATE TABLE payments(
amount double
, currency string
, from_account string
, to_account string
, time datetime
)
FORMAT (string, avro);INSERT INTO payments(
_key
, amount
, currency
, from_account
, to_account
, time
)
VALUES
("6A461C60-02F3-4C01-94FB-092ECBDE0837", 10, "EUR", "account-1", "account-2", 1590970345000),
("E5DA60E8-F622-43B2-8A93-B958E01E8AB3", 100000, "EUR", "account-1", "account-3", 1591070346000),
("0516A309-FB2B-4F6D-A11F-3C06A5D64B68", 5300, "USD", "account-2", "account-3", 1591170347000),
("0871491A-C915-4163-9C4B-35DEA0373B41", 6500, "EUR", "account-3", "account-1", 1591270348000),
("2B557134-9314-4F96-A640-1BF90887D846", 300, "EUR", "account-1", "account-4", 1591370348000),
("F4EDAE35-45B4-4841-BAB7-6644E2BBC844", 3400, "EUR", "account-2", "account-1", 1591470349000),
("F51A912A-96E9-42B1-9AC4-42E923A0A6F8", 7500, "USD", "account-2", "account-3", 1591570352000),
("EC8A08F1-75F0-49C8-AA08-A5E57997D27A", 2500000, "USD", "account-1", "account-3", 1591670356000),
("9DDBACFF-D42B-4042-95AC-DCDD84F0AC32", 1000, "GBP", "account-2", "account-3", 1591870401000)
;SET defaults.topic.autocreate=true;
INSERT INTO big_payments
SELECT STREAM *
FROM payments
WHERE amount >= 5000KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, ... }
------------------------------------
KEY:"0516A309-FB2B-4F6D-A11F-3C06A5D64B68"
VALUE: { amount:5300, ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, ... }SET defaults.topic.autocreate=true;
INSERT INTO big_eur_usd_payments
SELECT STREAM *
FROM payments
WHERE
(amount >= 5000 AND currency = 'EUR') OR
(amount >= 5500 AND currency = 'USD')SET defaults.topic.autocreate=true;
INSERT INTO big_payments_case
SELECT STREAM *
FROM payments
WHERE
amount >= (CASE
WHEN currency = 'EUR' THEN 5000
WHEN currency = 'USD' THEN 5500
WHEN currency = 'GBP' THEN 4500
ELSE 5000
END)KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, currency:"EUR", ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, currency:"USD", ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", ... }SET defaults.topic.autocreate=true;
INSERT INTO night_payments
SELECT STREAM *
FROM payments
WHERE
CAST(HOUR(time) AS INT) >= 0 AND
CAST(HOUR(time) AS INT) <= 5 KEY:"6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: { amount:10, currency:"EUR", time:1590970345000, ... }
------------------------------------
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", time:1591070346000, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", time:1591670356000, ... }SET defaults.topic.autocreate=true;
INSERT INTO payments_sample
SELECT STREAM *
FROM payments
WHERE CAST(ABS(RANDINT()) AS DOUBLE) / 2147483647 < 0.01