Enrich Streams


In this article, we will be enriching customer call events with their customer details.

Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.

Topics involved:

  • customer_details messages contain information about the customer
  • customer_call_details messages contain information about calls

Streaming SQL enrichment on Apache Kafka 

SET defaults.topic.autocreate=true;

INSERT INTO customers_callInfo
SELECT STREAM
    calls._value AS call 
    , customer._value AS customer
FROM  customer_call_details AS calls
        INNER JOIN  (SELECT TABLE * FROM customer_details) AS customer
            ON customer._key.customer.id = calls._key.customer.id

Testing data 

To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:

CREATE TOPIC customer_details

CREATE TABLE customer_details(
        _key.customer.typeID string
        , _key.customer.id string
        , customer.name string
        , customer.middleName string null
        , customer.surname string
        , customer.nationality string
        , customer.passportNumber string
        , customer.phoneNumber string
        , customer.email string null
        , customer.address string
        , customer.country string
        , customer.driverLicense string null
        , package.typeID string
        , package.description string
        , active boolean
)
FORMAT(avro, avro)
PROPERTIES(partitions=5, replication=1, compacted=true);

POPULATE TOPIC customer_details

INSERT INTO customer_details(
        _key.customer.typeID
        , _key.customer.id
        , customer.name
        , customer.middleName
        , customer.surname
        , customer.nationality
        , customer.passportNumber
        , customer.phoneNumber
        , customer.email
        , customer.address
        , customer.country
        , customer.driverLicense
        , package.typeID
        , package.description
        , active
) VALUES
("userType1","5162258362252394","April","-","Paschall","GBR","APGBR...","1999153354","aprilP@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("internal","5290441401157247","Charisse","-","Daggett","USA","CDUSA...","6418577217","charisseD@mydomain.com","-","USA","-","TypeC","Desc.",true),
("internal","5397076989446422","Gibson","-","Chunn","USA","GCUSA...","8978860472","gibsonC@mydomain.com","-","USA","-","TypeC","Desc.",true),
("partner","5248189647994492","Hector","-","Swinson","NOR","HSNOR...","8207437436","hectorS@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5196864976665762","Booth","-","Spiess","CAN","BSCAN...","6220504387","hectorS@mydomain.com","-","CAN","-","TypeA","Desc.",true),
("userType2","5423023313257503","Hitendra","-","Sibert","SWZ","HSSWZ...","6731834082","hitendraS@mydomain.com","-","SWZ","-","TypeA","Desc.",true),
("userType2","5337899393425317","Larson","-","Asbell","SWE","LASWE...","2844252229","larsonA@mydomain.com","-","SWE","-","TypeA","Desc.",true),
("partner","5140590381876333","Zechariah","-","Schwarz","GER","ZSGER...","4936431929","ZechariahS@mydomain.com","-","GER","-","TypeB","Desc.",true),
("internal","5524874546065610","Shulamith","-","Earles","FRA","SEFRA...","2119087327","ShulamithE@mydomain.com","-","FRA","-","TypeC","Desc.",true),
("userType1","5204216758311612","Tangwyn","-","Gorden","GBR","TGGBR...","9172511192","TangwynG@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType1","5336077954566768","Miguel","-","Gonzales","ESP","MGESP...","5664871802","MiguelG@mydomain.com","-","ESP","-","TypeA","Desc.",true),
("userType3","5125835811760048","Randie","-","Ritz","NOR","RRNOR...","3245795477","RandieR@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5317812241111538","Michelle","-","Fleur","FRA","MFFRA...","7708177986","MichelleF@mydomain.com","-","FRA","-","TypeA","Desc.",true),
("userType1","5373595752176476","Thurborn","-","Asbell","GBR","TAGBR...","5927996719","ThurbornA@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType3","5589753170506689","Noni","-","Gorden","AUT","NGAUT...","7288041910","NoniG@mydomain.com","-","AUT","-","TypeA","Desc.",true),
("userType2","5588152341005179","Vivian","-","Glowacki","POL","VGPOL...","9001088901","VivianG@mydomain.com","-","POL","-","TypeA","Desc.",true),
("partner","5390713494347532","Elward","-","Frady","USA","EFUSA...","2407143487","ElwardF@mydomain.com","-","USA","-","TypeB","Desc.",true),
("userType1","5322449980897580","Severina","-","Bracken","AUT","SBAUT...","7552231346","SeverinaB@mydomain.com","-","AUT","-","TypeA","Desc.",true);

CREATE TOPIC customer_call_details

CREATE TABLE customer_call_details(
    _key.customer.typeID string
    , _key.customer.id string
    , callInfoCustomerID string
    , callInfoType string
    , callInfoDuration int
    , callInfoInit int)
FORMAT(avro, avro)
PROPERTIES(partitions=1, replication=1, compacted=false)

POPULATE TOPIC customer_call_details

INSERT INTO customer_call_details(
    _key.customer.typeID
    , _key.customer.id
    , callInfoCustomerID
    , callInfoType
    , callInfoDuration
    , callInfoInit
) VALUES
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 470, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 67, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 377, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 209, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 209, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 887, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 203, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 1698, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 320, 1),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 89, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 355, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 65, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 43, 1),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 530, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 270, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1633, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 110, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 540, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 168, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1200, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 1200, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 22, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 333, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 87, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 123, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 182, 1),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 844, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 56, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 36, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 794, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 440, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 52, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 770, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 627, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 555, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 55, 1);

Validate results 

SELECT
    p.callInfoCustomerID AS customerID
    , p.callInfoType
    , p.callInfoInit
FROM customer_call_details AS p
        INNER JOIN customer_details AS c
            ON p._key.customer.id = c._key.customer.id
--
Last modified: January 22, 2024