Join 3 Kafka topics


In this example, we will show how to aggregate three Kafka topics by using Streaming SQL processors.

We will be aggregating:

  • employee_dictionary: messages contain the name, surname and employee id
  • contact_info: messages contain the email and other contact information
  • address: message contain address details

The events are streamed into Kafka from an external database, and the goal is to aggregate all employee information into a single topic.

Streaming SQL enrichment on Apache Kafka 

With the following SQL function, we can wrangle events that are sent to the three Kafka topics, (as they are sourced upstream from a Database using a CDC Change Data Capture Process Connector).


SET defaults.topic.autocreate=true;

WITH AdressInfoTable AS (
    SELECT TABLE
        metadata.masterKey as masterKey
        , employeeAddress
    FROM employee_address_events
);

WITH ContactInfoTable AS (
    SELECT TABLE
        metadata.masterKey as masterKey
        , employeeContactInfo
    FROM employee_contact_info_events
);

WITH productTable AS (
    SELECT TABLE
        p.masterKey as masterKey
        , c.employeeContactInfo
        , p.employeeAddress
    FROM AdressInfoTable AS p
    INNER JOIN ContactInfoTable AS c 
            ON p.masterKey = c.masterKey
);



INSERT INTO employee_enriched
SELECT TABLE
        productTable.masterKey
        , employee_dictionary.employee.firstName as firstName
        , employee_dictionary.employee.lastName as lastName
        , employee_dictionary.employee.employeeId as employeeId
        , productTable.employeeContactInfo
        , productTable.employeeAddress
FROM employee_dictionary INNER JOIN productTable 
        ON employee_dictionary._key = productTable.masterKey;

The streaming topology that we want to achieve is effectively the bellow:

Topology of Streaming SQL and joining 3 Kafka topics

Testing data 

To simplify our testing process and run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:

CREATE TOPIC employee_dictionary

CREATE TABLE employee_dictionary (
    _key string
    , metadata.eventType string
    , metadata.entityType string
    , employee.firstName string
    , employee.lastName string 
    , employee.employeeId string
)
FORMAT(string, json)
PROPERTIES(partitions=1, replication=1, compacted=true);

Note: The employee_dictionary topics is compacted since it hosts the name and employee id.

POPULATE TOPIC employee_dictionary

INSERT INTO employee_dictionary(
    _key
    , metadata.eventType
    , metadata.entityType
    , employee.firstName
    , employee.lastName
    , employee.employeeId
) VALUES
("196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_created","employee","Ollie","Johnson","30-6107836"),
("9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_created","employee","Harry","Williamson","25-7784327");

CREATE TOPIC employee_contact_info_events

CREATE TABLE employee_contact_info_events (
    _key string
    , metadata.masterKey string
    , metadata.eventType string
    , metadata.entityType string
    , employeeContactInfo.type string
    , employeeContactInfo.preferencePriority int
    , employeeContactInfo.value string
)
FORMAT(string, json)
PROPERTIES(partitions=1, replication=1, compacted=false);

POPULATE TOPIC employee_contact_info_events

INSERT INTO employee_contact_info_events(
    _key
    , metadata.masterKey
    , metadata.eventType
    , metadata.entityType
    , employeeContactInfo.type
    , employeeContactInfo.preferencePriority
    , employeeContactInfo.value
) VALUES
("55f3730f-731b-45ae-8f06-1a333ab83210","196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_contact_info_added","employee_contact_info","email",1,"ollie@yahoo.com"),
("57432ae9-fa30-478e-aa87-12d026cd2bad","9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_contact_info_added","employee_contact_info","email","1","harry@olson-reed.org");

CREATE TOPIC employee_address_events

CREATE TABLE employee_address_events(
    _key string
    , metadata.masterKey string 
    , metadata.eventType string
    , metadata.entityType string
    , employeeAddress.type string
    , employeeAddress.address string
    , employeeAddress.city string
    , employeeAddress.state string
    , employeeAddress.zipCode int
)
FORMAT(string, json)
PROPERTIES(partitions=1, replication=1, compacted=false);

POPULATE TOPIC employee_address_events

INSERT INTO employee_address_events(
    _key
    , metadata.masterKey
    , metadata.eventType
    , metadata.entityType
    , employeeAddress.type
    , employeeAddress.address
    , employeeAddress.city
    , employeeAddress.state
    , employeeAddress.zipCode
) VALUES
("479b6e61-1e3a-457c-a26f-2c7ef7d35f5b","196ea70d-b7f2-43ac-a15a-fa1774f7488a","employee_address_added","employee_address","physical","3415 Brookdale Drive","Santa Clara","CA","95051"),
("23cfb3a6-cebb-4746-a7ee-6cd64b527e1c","9d7f69e9-9ea0-4785-903a-d09d0f8342b7","employee_address_added","employee_address","physical","835 Shiloh Court","Redding","CA","96003");

Data enrichment 

Now that we have created the topics and populated them, we are ready to execute our streaming SQL function to create enriched information and write it into a new topic.

A Kafka Streams application executing the above SQL and deployed automatically (in ~ 2 seconds) onto Kubernetes will result into the following performance/monitoring information:

Apache Kafka SQL Query performance - test environment

In just a few seconds, we have already enriched hundreds of thousand of events:

Enriched Apache Kafka message
--
Last modified: September 15, 2024