Stream Processing

Apache Kafka ecosystem is enhanced by the Kafka Streams API which allows for data streams processing. The framework is available for the Java runtime, therefore you need to know you Java, Kotlin, Scala, etc. to make use of it. Not anymore. A Kafka Streams flow can be described by anyone with basic SQL skills.

Streaming Syntax

LSQL second usage is defining Kafka Streams flows through a SQL-like syntax. All the information learned so far is fully applicable to all the SELECT statements written for stream processing. The syntax, which might look complex initially, looks like this:

[ SET autocreate = true;]
[ SET partitions = 1;]
[ SET replication = 2;]
[ SET `decimal.scale`= 18;]
[ SET `decimal.precision`= 38;]
[ SET `ANY KAFKA STREAMS CONFIG. See Kafka documentation StreamsConfig, ConsumerConfig and ProducerConfig` = '';]
[ SET `topic.[ANY KAFKA Log CONFIG. See LogConfig]` = '';]
[ SET `rocksdb.[RocksDB specific configurations. See the section below on RocksDB]`= '';]

INSERT INTO _TARGET_TOPIC_
[WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]
SELECT select_expr [, select_expr ...]
FROM _ID_NAME_ INNER JOIN _OTHER_ID_NAME_ ON join_expr
[WHERE condition_expr]
[GROUP BY group_by_expr]
[HAVING having_expr]

If you are not familiar with Apache Kafka stream processing API please follow the documentation.

Important

Streaming SQL is not your typical RDBMS SQL. Core concepts around stream processing with Apache Kafka, the duality of Table/Stream, the implication of creating a Table versus a Stream instance, etc. need to be understood first.

Using LSQL for streaming allows you to do:

  • Transformations
  • Aggregation
  • Join data

We will go through each one in detail but before we do so we need to expand on the syntax you have seen earlier.

Important

When using AVRO payloads the schema needs to be present. LSQL engine does static validation against the existing schema. If the schema is missing an error will be returned.

Windowing

Windowing allows you to control how to group records which share the same key for stateful operations such as aggregations or join windows. Windows are tracked per record key. LSQL has support for the full spectrum of windowing functionality available in the Kafka Streams API.

Note

A record is discarded and will not be processed by the window if it arrives after the retention period has passed.

You can use the following types of windows in LSQL:

  • Hopping time windows. These are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.
...
GROUP BY HOP(5,m,1,m)
...
  • Tumbling time windows. These are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.
...
GROUP BY tumble(1,m)
...
  • Sliding windows. These express fixed-size window that slides continuously over the time axis. Here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps.
...
GROUP BY SLIDING(1,m)
...
  • Session windows. These are used to aggregate key-based events into sessions. Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap, then a new session will be created. Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes). As such session windows can’t be pre-computed and are instead derived from analyzing the timestamps of the data records.
...
GROUP BY SESSION(10,m, 5, m)
...

All the window functions allow the user to specify the time unit. Supported time windows are

Keyword Unit
MS milliseconds
S seconds
M minutes
H hours

Table or Stream

When using Apache Kafka Streams API you can build a KStream or KTable from a topic. To distinguish between them, LSQL uses the keyword STREAM. When the keyword is missing an instance of KTable is created. When the keyword is present an instance of KStream is created.

Important

Use SELECT STREAM to create a KStream instance. Use SELECT to create a KTable instance.

Important

When creating a table the Kafka Messages must have a non-null key. Otherwise, the record is ignored.

KStream Settings

LSQL allows you to define a Kafka Stream flow with SQL-like syntax. From the target topic settings to the producer/consumer settings LSQL allows the user to override the defaults. LSQL supports setting these configurations via a standard SQL pattern of setting variables via SET. For example:

SET `auto.offset.reset` = 'smallest';
SET `processing.guarantee`= 'exactly_once';  //this is for Kafka 0.11+ enabling exactly once semantics
SET `commit.interval.ms` = 1000;     //The frequency with which to save the position of the processor.

Any of the target topic specific configurations can also be specified here. Follow the Apache Kafka documentation here for a full list of topic specific configuration options. To set the configuration for the flow result topic you need to prefix the key with topic.. For example to set the cleanup policy to compact ` and to `flush.messages every 5 messages you need to configure LSQL as follows:

SET `topic.cleanup.policy`='compact';
SET `topic.flush.messages`= 5;
...

Apart from the topic, producer/consumer or Kafka stream configs, LSQL allows you to set the following:

Setting Description Type Example
autocreate
If the target topic does not exist it will create it. **If the
Kafka setup does not allow for auto topic creation the flow will fail!**
BOOLEAN SET autocreate=true
partitions
The number of partitions to create for the target topic.
Applies only when autocreate is set to true. By default is false.
INTEGER SET partitions=2
replication
How many replicas to create for the target topic. Applies
only when autocreate is set to true. By default is false.
INTEGER SET replication=3
decimal.scale
When working with AVRO records where decimal type is
involved it specifies the decimal scale.
INTEGER SET `decimal.scale`=18
decimal.precision
When working with AVRO records where decimal type is
involved it specifies the decimal precision.
INTEGER SET `decimal.precision`=38

Important

Each SET .. instruction needs to be followed by a semicolon:;.

Here is an example of setting the commit interval to 5 seconds and enabling exactly-once semantics (Apache Kafka 0.11+):

SET `commit.interval.ms` = 5000;
SET `processing.guarantee`= 'exactly_once';

INSERT INTO `hot_sensors`
SELECT
    ip
    , lat
    , `long`
    , (temp * 1.8 + 32) as metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
AND temp > 30

Note

Configuring a stream flow via code requires the following configuration keys to be set: default.key.serde and default.value.serde. LSQL takes care of this based on the values specified in the SQL so you don’t have to set them.

RocksDB Settings

Whenever the Kafka streams application requires state, it will rely on RocksDB. Configuring this key-value store breaks the pattern found in Kafka Streams API. To customize the settings one has to provide an implementation for org.apache.kafka.streams.state.RocksDBConfigSetter. LSQL covers most of the settings available, here is the entire list :

Key Type Description  
rocksdb.table.block.cache.size LONG
Set the amount of cache in bytes
that will be used by RocksDB.
If cacheSize is non-positive, then cache will not be used.
DEFAULT: 8M
 
rocksdb.table.block.size LONG Approximate size of user data packed per block. Default: 4K  
rocksdb.table.block.cache.compressed.num.shard.bits INT TableFormatConfig.setBlockCacheCompressedNumShardBits
Controls the number of shards for the
block compressed cache
rocksdb.table.block.cache.num.shard.bits INT Controls the number of shards for the block cache  
rocksdb.table.block.cache.compressed.size LONG
Size of compressed block cache. If 0,
then block_cache_compressed is set to null
 
rocksdb.table.block.restart.interval INT Set block restart interval  
rocksdb.table.block.cache.size.and.filter BOOL
Indicating if we’d put index/filter blocks to the block cache.
If not specified, each ‘table reader’ object will pre-load index/filter
block during table initialization
 
rocksdb.table.block.checksum.type STRING
Sets the checksum type to be used with this table.
Available values: kNoChecksum, kCRC32c, kxxHash.
 
rocksdb.table.block.hash.allow.collision BOOL
Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
if true, does not store prefix and allows prefix hash collision
(less memory consumption)
 
rocksdb.table.block.index.type STRING
Sets the index type to used with this table.
Available values: kBinarySearch, kHashSearch
 
rocksdb.table.block.no.cache BOOL
Disable block cache. If this is set to true,
then no block cache should be used. Default: false
 
rocksdb.table.block.whole.key.filtering BOOL
If true, place whole keys in the filter (not just prefixes).
This must generally be true for gets to be efficient.
Default: true
 
rocksdb.table.block.pinl0.filter BOOL
Indicating if we’d like to pin L0 index/filter blocks to the block cache.
If not specified, defaults to false.
 
rocksdb.total.threads INT The max threads RocksDB should use  
rocksdb.write.buffer.size LONG
Sets the number of bytes the database will build up in memory
(backed by an unsorted log on disk) before converting to a
sorted on-disk file
 
rocksdb.table.block.size.deviation INT
This is used to close a block before it reaches the configured
‘block_size’. If the percentage of free space in the current block is less
than this specified number and adding a new record to the block will
exceed the configured block size, then this block will be closed and the
new record will be written to the next block.
Default is 10.
 
rocksdb.table.block.format.version INT
We currently have three versions:

0 - This version is currently written out by all RocksDB’s versions by default.
Can be read by really old RocksDB’s. Doesn’t support changing
checksum (default is CRC32).

1 - Can be read by RocksDB’s versions since 3.0.
Supports non-default checksum, like xxHash. It is written by RocksDB when
BlockBasedTableOptions::checksum is something other than kCRC32c. (version
0 is silently upconverted)

2 - Can be read by RocksDB’s versions since 3.10.
Changes the way we encode compressed blocks with LZ4, BZip2, and Zlib
compression. If you don’t plan to run RocksDB before version 3.10,
you should probably use this.

This option only affects newly written tables. When reading existing
tables, the information about version is read from the footer.
 
rocksdb.compaction.style STRING Available values: LEVEL, UNIVERSAL, FIFO  
rocksdb.max.write.buffer INT    
rocksdb.base.background.compaction INT    
rocksdb.background.compaction.max INT    
rocksdb.subcompaction.max INT    
rocksdb.background.flushes.max INT    
rocksdb.log.file.max LONG    
rocksdb.log.fle.roll.time LONG    
rocksdb.compaction.auto BOOL    
rocksdb.compaction.level.max INT    
rocksdb.files.opened.max INT    
rocksdb.wal.ttl LONG    
rocksdb.wal.size.limit LONG    
rocksdb.memtable.concurrent.write BOOL    
rocksdb.os.buffer BOOL    
rocksdb.data.sync BOOL    
rocksdb.fsync BOOL    
rocksdb.log.dir STRING    
rocksdb.wal.dir STRING    

All the table configurations will initialize a BlockBasedTableConfig on which the call to set the values is made. This will be followed by a call to Options.setTableFormatConfig(tableConfig).

Transformations

This is the basic and most common use case. Transforming an incoming topic to morph the messages using any of the capabilities provided by the SELECT statement. That includes:

  • Selecting specific fields
  • Applying supported functions to achieve a new field
  • Filtering the records based on your criteria.

Let’s imagine we have a topic containing sensor specific data:

{
  "device_id": 1,
  "ip": "191.35.83.75",
  "timestamp": 1447886791,
  "lat": 22,
  "long": 82,
  "scale": "Celsius",
  "temp": 22.0,
  "device_name": "sensor-AbC-193X",
  "humidity": 15,
  "zipcode": 95498
}

And we want to select only that data where the temperature is over 30 degrees Celsius. Furthermore, we want the temperature value to be expressed in Fahrenheit and we only need ip, lat, long fields from the initial data. To do so we can write this LSQL statement:

INSERT INTO `hot_sensors`
SELECT
    ip
    , lat
    , long
    , (temp * 1.8 + 32) AS metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
AND temp > 30

This is the simplest flow you could write and the query will end up producing records looking like this:

{
  "ip": "191.35.83.75",
  "lat": 22,
  "long": 82,
  "metric": 71.6
}

The SQL syntax allows you to access nested fields or a complex field. We can change slightly the structure above to do this. The new data looks like this:

{
  "description":"Sensor embedded in exhaust pipes in the ceilings",
  "ip":"204.116.105.67",
  "id":5,
  "temp":40,
  "c02_level":1574,
  "geo":{
    "lat":35.93,
    "long":-85.46
    }
}

First, we write the SQL to address the nested fields:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo.lat
    , geo.long
    , temp
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO

The result of applying this query will be AVRO records with the following format:

{
   "ip":"204.116.105.67",
   "lat":35.93,
   "long":-85.46,
   "temp":40
}

If the user selects a complex field, the entire substructure is copied over. For example:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo
    , temp
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO

The new records will have this format:

{
  "ip":"204.116.105.67",
  "geo":{
    "lat":35.93,
    "long":-85.46
    },
  "temp":40
}

These examples are for records of type AVRO, but the similar support is provided for JSON payloads.

Aggregation

Typical streaming aggregation involves scenarios similar to these:

  • Counting the number of visitors on your website per region
  • Totalling amount of Foreign Exchange transactions for GBP-USD on a 15 minutes interval
  • Totalling sales made on each of the company stores every day
  • Retaining the minimum and maximum stock value on a 30 minutes interval

These are just a few examples - the list goes on. LSQL gives you a way of quickly express such aggregation over Kafka streams with either JSON or AVRO payloads.

Imagine a trading system needs to display the number of transactions made for each currency pair (GBPUSD is a currency exchange ticker). Such functionality can be easily achieved with a query like this:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker

Remember this is a stateful stream so potentially, you could see the values for a ticker more than once. It depends on how many transactions flow through the Kafka topic. The result of this query could be the following:

Key Value
GBPUSD 1
CHFYEN 1
USDEUR 1
GBPUSD 3
USDEUR 5

Suppose the user needs to look only at specific tickers. There are two approaches here where the filter is applied in the WHERE clause (best for performance) or relying on HAVING clause. Both examples are covered by the queries below:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
AND ticker LIKE '%GBP%'
GROUP BY ticker

--OR

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker
HAVING ticker in ('GBPUSD', 'EURDKK', 'SEKCHF')

Having clause allows the usage of any of the LSQL supported functions to achieve your filter requirements. To illustrate that we will filter all the tickers for USD and the list: GBPUSD, EURDKK, SEKCHF

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker
HAVING ticker IN ('GBPUSD', 'EURDKK', 'SEKCHF') OR ticker LIKE '%USD%'

There are scenarios where grouping by the record key part. Assume the fx topic contains the ticker in the key part. In that case, the queries become:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
GROUP BY _key

-- OR adding a filter

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
AND _key.* LIKE '%GBP%'
GROUP BY _key

Important

Every time GROUP BY a field is involved, the resulting key on the target topic is STRING! This is to allow joins on multiple fields.

In the version 1, LSQL does not support arithmetic on aggregation functions. By that we mean you can not do `SUM(fieldA)/count(*)`. We are looking at solutions to address this in the upcoming version(-s). Meanwhile here is how you can do it for now:

SET `auto.offset.reset`='latest';
SET autocreate = true;
SET `commit.interval.ms` = 3000;

INSERT INTO sensor_data_avg
WITH
avgStream as
(
SELECT STREAM
        COUNT(*) as total,
        SUM(temperature) AS temperatureTotal,
        SUM(humidity) AS humidityTotal,
        MIN(temperature) AS minTemperature,
        MAX(temperature) AS maxTemperature,
        MIN(humidity) AS minHumidity,
        MAX(humidity) AS maxHumidity
FROM  `sensor_data`
WHERE _ktype='STRING' AND _vtype='JSON'
GROUP BY TUMBLE(2,s),_key
)

SELECT STREAM
    temperatureTotal/total AS avgTemperature,
    humidityTotal/total AS avgHumidity,
    minTemperature,
    maxTemperature,
    minHumidity,
    maxHumidity
FROM avgStream

Notice the last SELECT statement uses the output data from the first one in order to achieve the average calculation.

Important

Doing aggregate functions (SUM/COUNT/MIN/MAX) arithmetic is not supported!

Using Window

We have shown so far simple aggregation without involving windows. That might solve some of the requirements a user has. Aggregating over a window is a very common scenario for streaming. Window support was introduced earlier; please revisit the windowing section.

Keeping the trend of IoT scenarios, imagine there is a stream of metrics information from devices across the globe. The data structure looks like this:

{
  "device_id": 2,
  "device_type": "sensor-gauge",
  "ip": "193.156.90.200",
  "cca3": "NOR",
  "cn": "Norway",
  "temp": 18,
  "signal": 26,
  "battery_level": 8,
  "timestamp": 1475600522
}

The following query allows you to count all the records received from each country on a tumbling window of 30 seconds. Such functionality can be described like this:

INSERT INTO norway_sensors_count
SELECT count(*) AS total
FROM sensors
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY tumble(30,s), cca3

The result would be records emitted on a 30 seconds interval and they would look similar to this:

header:

“Key”, “Value”

NOR, 10 ROM, 2 FRA, 126 UK, 312 US, 289 NOR, 2 FRA, 16 UK, 352 US, 219

Note

Remember the key value will be of type String.

So far we have done counting only but LSQL provides support for SUM, MIN or MAX as well. Maybe your system processes customers orders and you want to keep computing every hour the total amount of orders over the last 24 hours.

SELECT
    product
    , SUM(amount) AS amount
FROM Orders
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY HOP(1, H, 1,D), product

Joins

A join operation merges two streams based on the keys of their data records. The result is a new stream.

Note

LSQL supports joins on key, but also allows the user to join based on value/key part fields. This will end up with both sides having the key of the record remapped. The key is a result of string concatenation of the fields involved

Kafka Streams supports the following join operations:

  • KStream-to-KStream Joins are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream’s records within the specified window interval to produce one result for each matching pair. A new KStream instance representing the resulting stream of the join is returned from this operator.
  • KTable-to-KTable Joins are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream’s materialized state stores to produce one result for each matching pair. A new KTable instance is produced representing the result stream of the join, which is also a changelog stream of the represented table.
  • KStream-to-KTable Joins allow you to perform table lookups against a changelog stream (KTable) upon receiving a new record from another record stream (KStream). An example use case would be to enrich a stream of orders (KStream) with the order details(KTable). Only records received from the record stream will trigger the join and produce results, not vice versa. This results in a brand new KStream instance representing the result stream of the join.

Here is a table of joins supported by Apache Kafka Streams:

Left Operand Right Operand Inner Join Left Join Outer Join
KStream KStream Yes Yes Yes
KTable KTable Yes Yes Yes
KStream KTable Yes Yes No

LSQL supports these joins operators:

  • INNER
  • LEFT
  • OUTER
  • RIGHT

Note

RIGHT JOIN will be expressed in terms of LEFT JOIN (The two operands are swapped)

Given the table above, here is a list of joins NOT possible by default in Kafka Streams API:

  • KTable RIGHT JOIN KStream
  • KTable OUTER JOIN KStream
  • KStream RIGHT JOIN KTable

LSQL ALLOWS the user to perform these operations, however, there are some costs associated with doing so. But before more details are provided, we need to give an overview of the context at hand. We said already a RIGHT JOIN is expressed as a LEFT JOIN and as a result, the above list becomes the following:

  • KStream LEFT JOIN KTable
  • KTable OUTER JOIN KStream
  • KTable LEFT JOIN KStream

The challenge here is that a KTable can only be joined with another one. Furthermore, at the moment there is not a straightforward way to go from a KStream instance to a KTable one. The only solution is to use an intermediary topic and then build the KTable required off that topic. Of course, this will hurt performance since the data has to be written to a topic and read again to allow for the join to happen. The topology description for the flow will reflect such scenario. Given this information the above joins become:

  • KTABLE LEFT JOIN KTable
  • KTable OUTER JOIN KTABLE
  • KTABLE LEFT JOIN KTABLE

A KStream OUTER JOIN KTable, despite not having support in the Kafka Streams API, is translated to a KStream OUTER JOIN KTable.

Important

LSQL transforms the flow as required to allow for the join type to happen. Fully understand the implications of making joins which require going through an intermediary topic.

Repartition

Apache Kafka Streams API does not allow joining two streams with a different partition count. This can easily be the case in real systems. For example, with an order and order-detail topic, the partition count on latter will be smaller since traffic is lower. To allow such a join LSQL makes sure it brings the two in line. As a result, it will have to create an order-repartition (the name is just an illustration) matching the right operand version.

Such operation will have a direct impact on performance since the entire topic is copied over just to have the join. The topology viewer allows the user to see when such flow change appears.

Using WITH

As you have seen earlier, the full syntax for LSQL contains the following:

[WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]

This allows you to break down your query complexity. For example, let’s consider the scenario where you have a topic for static product details which you will want to join against the topic containing the orders. From the product details, you only need to store the product name. This is the SQL to use to achieve such behavior:

...
INSERT INTO ...
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 )

SELECT ..
FROM  ...  JOIN productTable ON ...
...

Any names registered via WITH, in the example above product, can be referenced after its definition. If your requirements are as such that you need to define multiple entries you can do so by separating all the WITH via comma. For example:

WITH
productTable as
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 ),
userTable as
 (
    SELECT firstName, secondName
    FROM `user`
    WHERE _ktype = 'LONG'
    AND _vtype = AVRO
 )

The examples define tables (which translate into instances of a KTable) but you can specify a stream (which translates to an instance of KStream) by simply adding STREAM after the SELECT:

WITH
productStream AS
 (
   SELECT STREAM productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 )

Important

If the right operand is not found in the list of entities defined by WITH a Stream instance will be created. SELECT STREAM or SELECT within a join targets the left operand only.

Join on Key

When joins were introduced at the beginning of the chapter, it was stated that two records are matched when their keys are equal. Here is how you would join orders and order details for AVRO records:

INSERT INTO orders_enhanced
SELECT STREAM
    o.orderNumber
    , o.status
    , SUM(od.quantityOrdered * od.priceEach) total
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = AVRO
AND od._ktype = 'LONG'
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s),o.orderNumber

Important

You can not join two topics when left operand value decoder differs from right operand value decoder. Joining values from different decoder types is not supported.

When joining streams the join needs to happen over a JoinWindow. The GROUP BY tumble(2,s) will be used as part of aggregation but also from it LSQL will build an instance of JoinWindow instance to use when joining the streams but before apply the grouping. The translation between Window and JoinWindow happens as described in the table below:

header:

“Window Type”,”Join Window”

tumble(duration),JoinWindows.of(duration) “hop(duration,advance)”,JoinWindows.of(duration).until(advance) “session(inactivity,duration)”,JoinWindows.of(inactivity).until(advance) slide(duration),JoinWindows.of(duration)

If your topics have JSON payloads the above query should be:

INSERT INTO orders_enhanced
SELECT STREAM
    o.orderNumber
    , o.status
    , SUM(od.quantityOrdered * od.priceEach) total
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = JSON
AND od._ktype = 'LONG'
AND od._vtype = JSON
GROUP BY TUMBLE(2,s),o.orderNumber

You can still join two streams without aggregating by simply doing the following:

INSERT INTO `orders_enhanced`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  `product` as p
    INNER JOIN `order_details` AS od
        ON p._key = od._key
WHERE p._ktype = BYTES
AND p._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s)

Although GROUP BY is still used it is not actually applying grouping since no grouping fields were defined. If your product topic key is not AVRO, you can specify like in the example above, the _ktype='BYTES'. This gives some performance benefit for not having to deserialize the LONG in the earlier example and serializing it back to the output topic.

Important

Do not use BYTES when the payload is AVRO. The AVRO content is retained but the Schema Registry entry on the target will not be created.

All the functions supported by LSQL can be used in the select list. However unless grouping is involved the analytic ones: SUM, MIN, MAX, COUNT are not allowed.

INSERT INTO `orders_enhanced`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) AS productName
FROM  `order_details` AS od
    LEFT JOIN `product` as p
        ON p.productCode = od.productCode
WHERE p._ktype = BYTES
AND p._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(4,s)

Join on Fields

It is not always the case that the topic key is actually the value to join on. Maybe it was an oversight in initial development. but LSQL has it covered as well. It allows you to chose a field from the Kafka message value part to use during the join.

INSERT INTO `order_details`
SELECT STREAM
    o.orderNumber
    , o.status
    , o.flags
    , od.productCode
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o.orderNumber = od.orderNumber
WHERE o._ktype = BYTES
AND o._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s)

There is a trade-off here. Joining on a field like above means the stream needs to be remapped to allow for the new key. All groupings will result in a String key. The reason is LSQL allows you to join on more than one field! The key is a string concatenation of all the values involved.

Important

Joining on a value field(-s) will re-map the stream/table and the new key type will be String. Re-mapping a table has it’s cost since it will have to move the data from the KTable to a new topic and build a new instance of the table.

The standard way to handle joins with a table is to define the table via WITH. An optimal solution for joining orders with products to get the product name attached to the order looks like this:

INSERT INTO `orders_enhanced`
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  `order_details` AS od
    LEFT JOIN  productTable AS p
        ON p._key = od.productCode
WHERE od._ktype = BYTES
AND od._vtype = AVRO

First, a productTable is defined and it becomes the right operand for a LEFT JOIN. It is required for the od.productCode to be of type String since the key on the table is String. Also, notice _ktype is still required for order_details (od._ktype=’BYTES’). The resulting schema will have the productName field as an optional string since the right side might not be present.