Joining streams of data

This page describes a tutorial joining Kafka topics with Lenses SQL Processors.

magine you are the next Amazon, and you want to track the orders and shipment events to work out which orders have been shipped and how long it took. In this case, there will be two data streams, one for each event type, and the resulting stream will answer the questions above.

Enriching two streams of data requires a sliding window join. The events are said to be “close” to each other, if the difference between their timestamp is up to the time window specified.

Topics involved:

  • orders messages contain information about a customer

  • shipments messages contain information about the shipment

Streaming SQL enrichment on Apache Kafka

The query combines the data from orders and shipments if the orders are processed within 24 hours. Resulting records contain the order and shipment identifier, and the time between the order was registered to the time it was shipped.

SET defaults.topic.autocreate=true;
SET auto.offset.reset = 'earliest';

WITH o as (
 SELECT STREAM *
 FROM orders
 EVENTTIME BY orderTimestamp
);

WITH s as (
 SELECT STREAM *
 FROM shipments
 EVENTTIME BY timestamp
);

INSERT INTO orders_and_shipment
SELECT STREAM
    o._key AS orderId 
    , s._key AS shipmentId
    , DATE_TO_STR(TO_TIMESTAMP(s.timestamp - o.orderTimestamp), 'HH:mm:ss') AS time_difference
FROM  o INNER JOIN s
    ON o._key = s.orderId
WITHIN 24h

Testing data

To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:

CREATE TOPIC orders

CREATE TABLE orders(
        _key string
        , customerId string
        , orderTimestamp long
        , amount decimal(18,2)
)
FORMAT(string, avro)

POPULATE TOPIC orders

INSERT INTO orders(
        _key
        , customerId
        , orderTimestamp
        , amount
) VALUES
("o1","johanne@johanne.io",1596374427000,99.9),   --Sunday, 2 August 2020 13:20:27
("o2","johanne@johanne.io",1596453627000,240.01), --Monday, 3 August 2020 11:20:27
("o3","jack@jack.io",1596280827000,81.81),        --Saturday, 1 August 2020 11:20:27
("o4","anna@anna.io",1596453627000,300.00);       --Monday, 3 August 2020 11:20:27

CREATE TOPIC shipments

CREATE TABLE shipments(
    _key string
    , orderId string
    , timestamp long)
FORMAT(string, avro)

POPULATE TOPIC shipments

INSERT INTO shipments(
    _key
    , orderId
    , `timestamp`
) VALUES
("s1", "o1", 1596445927000),   --Monday, 3 August 2020 09:12:07
("s2", "o3", 1596456112000),   --Monday, 3 August 2020 12:01:52
("s3", "o4", 1596460271000);   --Monday, 3 August 2020 13:11:11

The output seen in the next screenshot shows two records. For the order with o2 identifier, there is no shipments entry because it has not been processed. For the order with identifier o3, the shipment happened after one day.

Content of result topic

Validate results

Let’s switch to the Snapshot engine by navigating to SQL Studio menu item. With the entries in both topics, we can write the following query to see which data is joinable without the window interval:

SELECT 
    o._key AS order 
   , DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
    , s._key AS shipment 
   , DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN  shipments s
    ON o._key = s.orderId

These are the results for the non-streaming query (i.e., Snapshot)

Content of SQL snapshot result topic

Running the query returned three records. But you can see the order o3 was processed two days after it was placed. Let’s apply the sliding window restriction for the Snapshot query by adding a filter to only match those records having their timestamp difference within a day.

SELECT 
    o._key AS order 
    , DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
    , s._key AS shipment 
    , DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN  shipments s
    ON o._key = s.orderId
WHERE to_timestamp(s.timestamp) - '1d' <= to_timestamp(o.orderTimestamp)

Now the result matches the one from Streaming query.

Conclusion

In this tutorial you learned how to join to Streams together using a sliding window. You achieved all the above using Lenses SQL engine.

Good luck and happy streaming!

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.