Examples

Data Browsing

Additional examples of filtering JSON or AVRO Kafka messages, based on pre-conditions on either the payload of the messages, or on message metadata:

-- Select all fields from a topic with AVRO messages, filtering messages with particular value payload
SELECT *
FROM topicA
WHERE f1 IS NULL
AND f2 = 3
AND f3 < 100
AND f4 >= 2
AND f5 <= 1
AND f6 > 12.4
AND f7 LIKE '%wow'
-- Select all fields from a topic with AVRO values, while filtering on nested AVRO record fields
SELECT *
FROM topicA
WHERE p1.p2.f1 IS NULL
OR (p1.p2.f2 = 3 AND p1.p2.f3 < 100 AND (p1.p2.f4 LIKE 'wow%' OR p1.p2.f5 = 'wow' ))
-- Select all fields from a topic with JSON data, while filtering on nested JSON record fields
SELECT *
FROM topicA
WHERE p1.p2.f1 is NOT NULL
OR (p1.p2.f2 <> 3 AND p1.p2.f3 < 100 AND (p1.p2.f4 like 'wow%' OR p1.p2.f5 = 'wow' ))
-- Select some fields (cherry-pick) from a topic with JSON data where the message timestamp
SELECT field1.fieldA
    , field2
    , field3,
    , _key.fieldM.fieldN AS N
FROM topicA
WHERE _ts >= '2018-01-01 00:00:00'
-- Select fields from both the Key and the Value for messages in the first 15 minutes of 2018
SELECT field1.fieldA
    , field2
    , field3
    , _key.fieldK AS keyField
FROM topicA
WHERE _ts >= '2018-01-01 00:00:00'
  AND _ts <  '2018-01-02 00:00:00'

Joins

-- LEFT JOIN messages from two topics, using a 4-second tumbling window and store results into target topic
INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , CONCAT(od.productCode,'-',p.productName) AS productName
FROM  `OrdersDetailsTopic` AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
GROUP BY TUMBLE(4,s)
-- RIGHT JOIN messages from two topics over a 10-minute tumbling window
INSERT INTO `toTopic`
WITH
product AS
 (
   SELECT productName
   FROM `ProductTopic`
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  product AS p
    RIGHT JOIN `OrdersDetailsTopic` AS od
        ON p._key = od.productCode
GROUP BY TUMBLE(10,m)
-- LEFT JOIN two topics with JSon data
INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) as productName
FROM  `OrdersDetailsTopic` AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
GROUP BY TUMBLE(4,s)
-- Full LEFT JOIN of a stream with a table
SET `auto.offset.reset`='latest';
INSERT INTO `toTopic`
WITH
tableTelecom AS (
  SELECT *
  FROM  `telecom_data`
)

SELECT STREAM
    data.squareId
    , grid.polygon
FROM `activity` AS data
    LEFT JOIN tableTelecom AS grid
    ON data._key = grid._key

Joins on optional field

There are scenarios when the incoming records on the topic(-s) do not always have a given field (or set of fields) but you still want to join on it (them). To do so, first you have to build two streams (left and right - as you can see below) which filter out those records which do not have header.correlationId and header.recordId. The second part is to have the join between the two streams created:

INSERT INTO new_records
WITH left as
(
 SELECT STREAM  header.correlationId, header.recordId
 FROM `topicA`
 WHERE exists(header.correlationId) is true AND exists(header.recordId) is true
),
right as
(
 SELECT STREAM  header.recordId as tdfRecId
 FROM `topicB`
 WHERE exists(header.recordId) is true
)

SELECT STREAM left.correlationId, left.recordId AS tdsRecId, right.tdfRecId
FROM left  INNER JOIN right
      ON left.correlationId = right.correlationId
GROUP BY tumble(1,m)