This page describes a tutorial to enrich a Kafka topic using Lenses SQL Processors.
In this article, we will be enriching customer call events with their customer details.
Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.
Topics involved:
customer_details
messages contain information about the customer
customer_call_details
messages contain information about calls
SET defaults.topic.autocreate=true;
INSERT INTO customers_callInfo
SELECT STREAM
calls._value AS call
, customer._value AS customer
FROM customer_call_details AS calls
INNER JOIN (SELECT TABLE * FROM customer_details) AS customer
ON customer._key.customer.id = calls._key.customer.id
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
customer_details
CREATE TABLE customer_details(
_key.customer.typeID string
, _key.customer.id string
, customer.name string
, customer.middleName string null
, customer.surname string
, customer.nationality string
, customer.passportNumber string
, customer.phoneNumber string
, customer.email string null
, customer.address string
, customer.country string
, customer.driverLicense string null
, package.typeID string
, package.description string
, active boolean
)
FORMAT(avro, avro)
PROPERTIES(partitions=5, replication=1, compacted=true);
customer_details
INSERT INTO customer_details(
_key.customer.typeID
, _key.customer.id
, customer.name
, customer.middleName
, customer.surname
, customer.nationality
, customer.passportNumber
, customer.phoneNumber
, customer.email
, customer.address
, customer.country
, customer.driverLicense
, package.typeID
, package.description
, active
) VALUES
("userType1","5162258362252394","April","-","Paschall","GBR","APGBR...","1999153354","aprilP@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("internal","5290441401157247","Charisse","-","Daggett","USA","CDUSA...","6418577217","charisseD@mydomain.com","-","USA","-","TypeC","Desc.",true),
("internal","5397076989446422","Gibson","-","Chunn","USA","GCUSA...","8978860472","gibsonC@mydomain.com","-","USA","-","TypeC","Desc.",true),
("partner","5248189647994492","Hector","-","Swinson","NOR","HSNOR...","8207437436","hectorS@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5196864976665762","Booth","-","Spiess","CAN","BSCAN...","6220504387","hectorS@mydomain.com","-","CAN","-","TypeA","Desc.",true),
("userType2","5423023313257503","Hitendra","-","Sibert","SWZ","HSSWZ...","6731834082","hitendraS@mydomain.com","-","SWZ","-","TypeA","Desc.",true),
("userType2","5337899393425317","Larson","-","Asbell","SWE","LASWE...","2844252229","larsonA@mydomain.com","-","SWE","-","TypeA","Desc.",true),
("partner","5140590381876333","Zechariah","-","Schwarz","GER","ZSGER...","4936431929","ZechariahS@mydomain.com","-","GER","-","TypeB","Desc.",true),
("internal","5524874546065610","Shulamith","-","Earles","FRA","SEFRA...","2119087327","ShulamithE@mydomain.com","-","FRA","-","TypeC","Desc.",true),
("userType1","5204216758311612","Tangwyn","-","Gorden","GBR","TGGBR...","9172511192","TangwynG@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType1","5336077954566768","Miguel","-","Gonzales","ESP","MGESP...","5664871802","MiguelG@mydomain.com","-","ESP","-","TypeA","Desc.",true),
("userType3","5125835811760048","Randie","-","Ritz","NOR","RRNOR...","3245795477","RandieR@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5317812241111538","Michelle","-","Fleur","FRA","MFFRA...","7708177986","MichelleF@mydomain.com","-","FRA","-","TypeA","Desc.",true),
("userType1","5373595752176476","Thurborn","-","Asbell","GBR","TAGBR...","5927996719","ThurbornA@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType3","5589753170506689","Noni","-","Gorden","AUT","NGAUT...","7288041910","NoniG@mydomain.com","-","AUT","-","TypeA","Desc.",true),
("userType2","5588152341005179","Vivian","-","Glowacki","POL","VGPOL...","9001088901","VivianG@mydomain.com","-","POL","-","TypeA","Desc.",true),
("partner","5390713494347532","Elward","-","Frady","USA","EFUSA...","2407143487","ElwardF@mydomain.com","-","USA","-","TypeB","Desc.",true),
("userType1","5322449980897580","Severina","-","Bracken","AUT","SBAUT...","7552231346","SeverinaB@mydomain.com","-","AUT","-","TypeA","Desc.",true);
customer_call_details
CREATE TABLE customer_call_details(
_key.customer.typeID string
, _key.customer.id string
, callInfoCustomerID string
, callInfoType string
, callInfoDuration int
, callInfoInit int)
FORMAT(avro, avro)
PROPERTIES(partitions=1, replication=1, compacted=false)
customer_call_details
INSERT INTO customer_call_details(
_key.customer.typeID
, _key.customer.id
, callInfoCustomerID
, callInfoType
, callInfoDuration
, callInfoInit
) VALUES
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 470, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 67, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 377, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 209, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 209, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 887, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 203, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 1698, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 320, 1),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 89, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 355, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 65, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 43, 1),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 530, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 270, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1633, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 110, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 540, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 168, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1200, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 1200, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 22, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 333, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 87, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 123, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 182, 1),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 844, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 56, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 36, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 794, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 440, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 52, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 770, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 627, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 555, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 55, 1);
SELECT
p.callInfoCustomerID AS customerID
, p.callInfoType
, p.callInfoInit
FROM customer_call_details AS p
INNER JOIN customer_details AS c
ON p._key.customer.id = c._key.customer.id