View the latest documentation 5.5
In this example, we will show how to aggregate three Kafka topics by using Streaming SQL processors.
We will be aggregating:
employee_dictionary
contact_info
address
The events are streamed into Kafka from an external database, and the goal is to aggregate all employee information into a single topic.
With the following SQL function, we can wrangle events that are sent to the three Kafka topics, (as they are sourced upstream from a Database using a CDC Change Data Capture Process Connector).
SET defaults.topic.autocreate=true; WITH AdressInfoTable AS ( SELECT TABLE metadata.masterKey as masterKey , employeeAddress FROM employee_address_events ); WITH ContactInfoTable AS ( SELECT TABLE metadata.masterKey as masterKey , employeeContactInfo FROM employee_contact_info_events ); WITH productTable AS ( SELECT TABLE p.masterKey as masterKey , c.employeeContactInfo , p.employeeAddress FROM AdressInfoTable AS p INNER JOIN ContactInfoTable AS c ON p.masterKey = c.masterKey ); INSERT INTO employee_enriched SELECT TABLE productTable.masterKey , employee_dictionary.employee.firstName as firstName , employee_dictionary.employee.lastName as lastName , employee_dictionary.employee.employeeId as employeeId , productTable.employeeContactInfo , productTable.employeeAddress FROM employee_dictionary INNER JOIN productTable ON employee_dictionary._key = productTable.masterKey;
The streaming topology that we want to achieve is effectively the bellow:
To simplify our testing process and run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
CREATE TOPIC employee_dictionary
CREATE TABLE employee_dictionary ( _key string , metadata.eventType string , metadata.entityType string , employee.firstName string , employee.lastName string , employee.employeeId string ) FORMAT(string, json) PROPERTIES(partitions=1, replication=1, compacted=true);
Note: The employee_dictionary topics is compacted since it hosts the name and employee id.
POPULATE TOPIC employee_dictionary
INSERT INTO employee_dictionary( _key , metadata.eventType , metadata.entityType , employee.firstName , employee.lastName , employee.employeeId ) VALUES ("196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_created","employee","Ollie","Johnson","30-6107836"), ("9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_created","employee","Harry","Williamson","25-7784327");
CREATE TOPIC employee_contact_info_events
employee_contact_info_events
CREATE TABLE employee_contact_info_events ( _key string , metadata.masterKey string , metadata.eventType string , metadata.entityType string , employeeContactInfo.type string , employeeContactInfo.preferencePriority int , employeeContactInfo.value string ) FORMAT(string, json) PROPERTIES(partitions=1, replication=1, compacted=false);
POPULATE TOPIC employee_contact_info_events
INSERT INTO employee_contact_info_events( _key , metadata.masterKey , metadata.eventType , metadata.entityType , employeeContactInfo.type , employeeContactInfo.preferencePriority , employeeContactInfo.value ) VALUES ("55f3730f-731b-45ae-8f06-1a333ab83210","196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_contact_info_added","employee_contact_info","email",1,"ollie@yahoo.com"), ("57432ae9-fa30-478e-aa87-12d026cd2bad","9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_contact_info_added","employee_contact_info","email","1","harry@olson-reed.org");
CREATE TOPIC employee_address_events
employee_address_events
CREATE TABLE employee_address_events( _key string , metadata.masterKey string , metadata.eventType string , metadata.entityType string , employeeAddress.type string , employeeAddress.address string , employeeAddress.city string , employeeAddress.state string , employeeAddress.zipCode int ) FORMAT(string, json) PROPERTIES(partitions=1, replication=1, compacted=false);
POPULATE TOPIC employee_address_events
INSERT INTO employee_address_events( _key , metadata.masterKey , metadata.eventType , metadata.entityType , employeeAddress.type , employeeAddress.address , employeeAddress.city , employeeAddress.state , employeeAddress.zipCode ) VALUES ("479b6e61-1e3a-457c-a26f-2c7ef7d35f5b","196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_address_added","employee_address","physical","3415 Brookdale Drive","Santa Clara","CA","95051"), ("23cfb3a6-cebb-4746-a7ee-6cd64b527e1c","9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_address_added","employee_address","physical","835 Shiloh Court","Redding","CA","96003");
Now that we have created the topics and populated them, we are ready to execute our streaming SQL function to create enriched information and write it into a new topic.
A Kafka Streams application executing the above SQL and deployed automatically (in ~ 2 seconds) onto Kubernetes will result into the following performance/monitoring information:
In just a few seconds, we have already enriched hundreds of thousand of events:
On this page