Examples

Joins

-- LEFT JOIN messages from two topics, using a 4-second tumbling window and store results into target topic
INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , CONCAT(od.productCode,'-',p.productName) AS productName
FROM  OrdersDetailsTopic AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
GROUP BY TUMBLE(4,s)
-- RIGHT JOIN messages from two topics over a 10-minute tumbling window
INSERT INTO `toTopic`
WITH
product AS
 (
   SELECT productName
   FROM `ProductTopic`
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  product AS p
    RIGHT JOIN `OrdersDetailsTopic` AS od
        ON p._key = od.productCode
GROUP BY TUMBLE(10,m)
-- LEFT JOIN two topics with JSON data
INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) as productName
FROM  OrdersDetailsTopic AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
GROUP BY TUMBLE(4,s)
-- Full LEFT JOIN of a stream with a table
SET `auto.offset.reset`='latest';
INSERT INTO `toTopic`
WITH
tableTelecom AS (
  SELECT *
  FROM  `telecom_data`
)

SELECT STREAM
    data.squareId
    , grid.polygon
FROM `activity` AS data
    LEFT JOIN tableTelecom AS grid
    ON data._key = grid._key

Joins on optional field

There are scenarios where although the incoming records on the topic(s) do not always have a given field (or set of fields), you still want to join on it (them). To perform that operation, first build two streams (left and right - as you can see below) which filter out those records which do not have header.correlationId and header.recordId. The second part is to have the join between the two created streams:

INSERT INTO new_records
WITH left as
(
 SELECT STREAM header.correlationId, header.recordId
 FROM `topicA`
 WHERE exists(header.correlationId) is true AND exists(header.recordId) is true
),
right as
(
 SELECT STREAM header.recordId as tdfRecId
 FROM `topicB`
 WHERE exists(header.recordId) is true
)

SELECT STREAM left.correlationId, left.recordId AS tdsRecId, right.tdfRecId
FROM left INNER JOIN right
      ON left.correlationId = right.correlationId
GROUP BY SLIDING(1,m)