Joining streams of data
This page describes a tutorial joining Kafka topics with Lenses SQL Processors.
Streaming SQL enrichment on Apache Kafka
SET defaults.topic.autocreate=true;
SET auto.offset.reset = 'earliest';
WITH o as (
SELECT STREAM *
FROM orders
EVENTTIME BY orderTimestamp
);
WITH s as (
SELECT STREAM *
FROM shipments
EVENTTIME BY timestamp
);
INSERT INTO orders_and_shipment
SELECT STREAM
o._key AS orderId
, s._key AS shipmentId
, DATE_TO_STR(TO_TIMESTAMP(s.timestamp - o.orderTimestamp), 'HH:mm:ss') AS time_difference
FROM o INNER JOIN s
ON o._key = s.orderId
WITHIN 24hTesting data
CREATE TOPIC orders
ordersPOPULATE TOPIC orders
ordersCREATE TOPIC shipments
shipmentsPOPULATE TOPIC shipments
shipments
Validate results

Conclusion
Last updated
Was this helpful?

