Lateral Joins

This page describes how to use lateral joins for data in Kafka with Lenses SQL Processors.

With Lateral Joins you can combine a data source with any array expression. As a result, you will get a new data source, where every record of the original one will be joined with the values of the lateral array expression.

Assume you have a source where elements is an array field:

field1field2elements

a

1

[1, 2]

b

2

[3, 4, 5]

c

3

[6]

Then a Lateral Join of source with elements is a new table, where every record of source will be joined with all the single items of the value of elements for that record:

field1field2elementselement

a

1

[1, 2]

1

a

1

[1, 2]

2

b

2

[3, 4, 5]

3

b

2

[3, 4, 5]

4

b

2

[3, 4, 5]

5

c

3

[6]

6

In this way, the single elements of the array become available and can be used as a normal field in the query.

Syntax

A query using lateral joins looks like a regular query apart from the definition of its source:

SELECT (STREAM|TABLE)
  <projection>
FROM
  <source> LATERAL
  <lateralArrayExpression> AS <lateralAlias>
WHERE
  <filterExpression>;
  • projection: as in a single-table select, all the fields from <source> will be available in the projection. In addition to that, the special field <lateralAlias> will be available.

  • source: the source of data. Note: it is not possible to specify a normal join as a source of a lateral join. This limitation will be removed in the future.

  • lateralArrayExpression: any expression that evaluates to an array. Fields <source> are available for defining this expression.

  • filterExpression: a filter expression specifying which records should be filtered.

Single Lateral Joins

Assume you have a topic batched_readings populated with the following records:

batched_readings

_keymeter_idreadings

a

1

[100, 80, 95, 91]

b

2

[87, 93, 100]

c

1

[88, 89, 92, 94]

d

2

[81]

As you can see, readings is a field containing arrays of integers.

We define a processor like this:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL readings AS reading
WHERE 
    reading > 90

The processor will emil the following records:

_keymeter_idreading

a

1

100

a

1

95

a

1

91

b

2

93

c

1

92

c

1

94

Things to notice:

  • We used the aliased lateral expression reading both in the projection and in the WHERE.

  • The _key for each emitted record is the one of the original record. As usual you can change this behavior projecting on the key with a projection like expression AS _key.

  • batched_readings records with keys a and b have been split into multiple records. That’s because they contain multiple readings greater than 90.

  • Record d disappeared, because it has no readings greater than 90

Multiple Lateral Joins

It is possible to use multiple LATERAL joins in the same FROM clause.

Assume you have a topic batched_nested_readings populated with the following records:

batched_readings

_keymeter_idnested_readings

a

1

[[100, 80], [95, 91]]

b

2

[[87], [93, 100]]

c

1

[[88, 89], [92, 94]]

d

2

[[81]]

Notice how nested_readings contains arrays of arrays of integers.

To get the same results of the previous example, we use a first lateral join to unpack the first level of nested_readings into an array that we call readings. We then define a second lateral join on readings to extract the single values:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL nested_readings AS readings
    LATERAL readings as reading
WHERE 
    reading > 90

Complex Lateral expressions

In the previous example we used a simple field as the <lateralArrayExpression>. In the section we will see how any array expression can be used for it.

Assume you have a topic day_night_readings populated with the following records:

day_night_readings

_keymeter_idreadings_dayreadings_night

a

1

[100, 80]

[95, 91]

b

2

[87, 93]

[100]

c

1

[88]

[89, 92, 94]

d

2

[81]

[]

We can make use of Array Functions to lateral join day_night_readings on the concatenation of the two readings fields:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL flatten([readings_day, readings_night]) AS reading
WHERE 
    reading > 90The processor such defined will emit the records
_keymeter_idreading

a

1

100

a

1

95

a

1

91

b

2

93

c

1

92

c

1

94

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.