This page describes how to use lateral joins for data in Kafka with Lenses SQL Processors.
With Lateral Joins you can combine a data source with any array expression. As a result, you will get a new data source, where every record of the original one will be joined with the values of the lateral array expression.
Assume you have a source where elements is an array field:
a
1
[1, 2]
b
2
[3, 4, 5]
c
3
[6]
Then a Lateral Join of source with elements is a new table, where every record of source will be joined with all the single items of the value of elements for that record:
a
1
[1, 2]
1
a
1
[1, 2]
2
b
2
[3, 4, 5]
3
b
2
[3, 4, 5]
4
b
2
[3, 4, 5]
5
c
3
[6]
6
In this way, the single elements of the array become available and can be used as a normal field in the query.
A query using lateral joins looks like a regular query apart from the definition of its source:
SELECT (STREAM|TABLE)
<projection>
FROM
<source> LATERAL
<lateralArrayExpression> AS <lateralAlias>
WHERE
<filterExpression>;projection: as in a single-table select, all the fields from <source> will be available in the projection. In addition to that, the special field <lateralAlias> will be available.
source: the source of data. Note: it is not possible to specify a normal join as a source of a lateral join. This limitation will be removed in the future.
lateralArrayExpression: any expression that evaluates to an array. Fields <source> are available for defining this expression.
filterExpression: a filter expression specifying which records should be filtered.
Assume you have a topic batched_readings populated with the following records:
batched_readings
a
1
[100, 80, 95, 91]
b
2
[87, 93, 100]
c
1
[88, 89, 92, 94]
d
2
[81]
As you can see, readings is a field containing arrays of integers.
We define a processor like this:
INSERT INTO readings
SELECT STREAM
meter_id,
reading
FROM
batched_readings
LATERAL readings AS reading
WHERE
reading > 90The processor will emil the following records:
a
1
100
a
1
95
a
1
91
b
2
93
c
1
92
c
1
94
Things to notice:
We used the aliased lateral expression reading both in the projection and in the WHERE.
The _key for each emitted record is the one of the original record. As usual you can change this behavior projecting on the key with a projection like expression AS _key.
batched_readings records with keys a and b have been split into multiple records. That’s because they contain multiple readings greater than 90.
Record d disappeared, because it has no readings greater than 90
It is possible to use multiple LATERAL joins in the same FROM clause.
Assume you have a topic batched_nested_readings populated with the following records:
batched_readings
a
1
[[100, 80], [95, 91]]
b
2
[[87], [93, 100]]
c
1
[[88, 89], [92, 94]]
d
2
[[81]]
Notice how nested_readings contains arrays of arrays of integers.
To get the same results of the previous example, we use a first lateral join to unpack the first level of nested_readings into an array that we call readings. We then define a second lateral join on readings to extract the single values:
INSERT INTO readings
SELECT STREAM
meter_id,
reading
FROM
batched_readings
LATERAL nested_readings AS readings
LATERAL readings as reading
WHERE
reading > 90In the previous example we used a simple field as the <lateralArrayExpression>. In the section we will see how any array expression can be used for it.
Assume you have a topic day_night_readings populated with the following records:
day_night_readings
a
1
[100, 80]
[95, 91]
b
2
[87, 93]
[100]
c
1
[88]
[89, 92, 94]
d
2
[81]
[]
We can make use of Array Functions to lateral join day_night_readings on the concatenation of the two readings fields:
INSERT INTO readings
SELECT STREAM
meter_id,
reading
FROM
batched_readings
LATERAL flatten([readings_day, readings_night]) AS reading
WHERE
reading > 90The processor such defined will emit the recordsa
1
100
a
1
95
a
1
91
b
2
93
c
1
92
c
1
94