Working with Arrays

This page describes a tutorial on how to work with array data in your Kafka topics using Lenses SQL Processors.

In this tutorial, we will see how to use Lenses SQL to extract, manipulate and inspect the single elements of an array.

In Lenses SQL you can use a LATERAL JOIN to treat the elements of an arrays as a normal field.

You will learn to:

  • Extract the single elements of an array with a single LATERAL join.

  • Extract elements of a multi-level array with nested LATERAL joins.

  • Use arbitrary complex array expressions as the right hand side of a LATERAL join.

Lateral Join on a simple array

In this example, we are getting data from a sensor.

Unfortunately, the upstream system register the readings in batches, while what we need is a single record for every reading.

An example of such a record is the following:

KEY: 1
VALUE: {
  "meter_id": 1,
  "readings": [100, 101, 102]
}

Notice how each record contains multiple readings, inside the reading array field.

We can replicate such a structure running the following query in SQL Studio:

CREATE TABLE batched_readings(
  meter_id int,
  readings int[]
) FORMAT(int, AVRO);

We can again use SQL Studio to insert some data to play with:

INSERT INTO batched_readings(_key, meter_id, readings) VALUES
(1, 1, [100, 92, 93, 101]),
(2, 2, [81, 82, 81]),
(3, 1, [95, 94, 93, 96]),
(4, 2, [80, 82])

What we want to obtain is a new topic readings, where each record contain a single reading, together with its meter_id. Considering the first record, we expect to explode it to four different records, one for each reading:

KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 92 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 93 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }

In LensesSQL you can easily achieve that with the special LATERAL syntax .

You can create a processor defined as:

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  batched_readings
  LATERAL readings as reading

The magic happens in batched_readings LATERAL readings as reading. With that we are basically saying:

  • for each record in batched_readings

  • for each element inside the readings array of that record

  • build a new record with all the fields of the original batched_readings record, plus an extra reading field, that will contain the value of the current element of readings.

We can then use in the SELECT both the original fields and the new reading field.

If you save the processor and run it, you will see that it will emit the records we expected.

Filtering

One of the powerful features of a LATERAL join is that the expression you put in the LATERAL can be used as a normal field. This means that you can then use it for example also in a WHERE or in a GROUP BY.

In this section we will see how to filter records generated by a LATERAL using a normal WHERE.

We want to modify our previous processor in order to emit only the readings greater than 95.

To do that is enough to use reading as if it were a normal field, in the WHERE section:

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  batched_readings
  LATERAL readings as reading
WHERE
  reading > 95

Running the processor we get the records

KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }
-----------------------------
KEY: 3
VALUE: { "meter_id": 1, "reading": 96 }

Lateral Join on a multi-level array

This example is similar to the previous one. The only difference is that the readings are now stored in batches of batches:

CREATE TABLE batched_readings_nested(
  meter_id int,
  nested_readings int[][]
)
FORMAT(int, AVRO);

As you can see, nested_readings is an array whose elements are array of integers.

We can again use SQL Studio to insert some data:

INSERT INTO batched_readings_nested(_key, meter_id, nested_readings) VALUES
(1, 1, [[100, 92], [93, 101]]),
(2, 2, [[81], [82, 81]]),
(3, 1, [[95, 94, 93], [96]]),
(4, 2, [[80, 82]])

We would like to define a processor that emits the same records of the previous one.

In this case though we are dealing with nested_readings, that is a multi-level array, so a single LATERAL join is not enough. But nesting a LATERAL inside another will do the jo

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  batched_readings_nested
  LATERAL nested_readings as readings
  LATERAL readings as reading

This is roughly what happens in the FROM clause:

  • We first unwrap the first level of the array, doing a batched_readings_nested LATERAL nested_readings as readings.

  • At that point, readings will be an array of integers.

  • We can then use it in the outer ... LATERAL readings as reading join, and the single integers will finally be extracted and made available as reading.

Complex array expressions

In this section we will see how it is possible to use any expression as the right hand side of a LATERAL join, as long as it gets evaluated to an array.

We have a table where the meter readings are split into two columns, readings_day and readings_night.

CREATE TABLE day_night_readings(
  meter_id int,
  readings_day int[],
  readings_night int[]
)
FORMAT(int, AVRO);

Let’s insert the same data as the first example, but where the readings are split across the two columns.

INSERT INTO day_night_readings(_key, meter_id, readings_day, readings_night) VALUES
(1, 1, [100, 92], [93, 101]),
(2, 2, [81], [82, 81]),
(3, 1, [95, 94, 93], [96]),
(4, 2, [80], [81])

To extract the readings one by one, we need first to concatenate the two arrays readings_day and readings_night. We can achieve that using flatten. We can then use the concatenated array in a LATERAL join:

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  day_night_readings
  LATERAL flatten([readings_day, readings_night]) as reading

The processor defined above will emit the records

KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 92 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 93 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }
-----------------------------
KEY: 2
VALUE: { "meter_id": 2, "reading": 81 }
...

as we expected.

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.