This page describes a tutorial to enrich a Kafka topic using Lenses SQL Processors.
In this article, we will be enriching customer call events with their customer details.
Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.
Topics involved:
customer_details messages contain information about the customer
customer_call_details messages contain information about calls
Streaming SQL enrichment on Apache Kafka
SETdefaults.topic.autocreate=true;INSERT INTO customers_callInfoSELECT STREAMcalls._valueAScall , customer._valueAS customerFROM customer_call_details AS callsINNER JOIN (SELECTTABLE*FROM customer_details) AS customerONcustomer._key.customer.id=calls._key.customer.id
Testing data
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
SELECT
p.callInfoCustomerID AS customerID
, p.callInfoType
, p.callInfoInit
FROM customer_call_details AS p
INNER JOIN customer_details AS c
ON p._key.customer.id = c._key.customer.id