All pages
Powered by GitBook
1 of 19

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

SQL Processors

Data formats

Changing data formats

Tutorial on how to change the format of data in a Kafka topic from JSON to AVRO with Lenses SQL Processors.

In this example, we will show how to create an AVRO topic from an existing JSON topic.

Requirements

For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:

  • through direct user action where the schema is manually set

  • through inference; Lenses will try to infer the schema of a topic by looking at the topic data

  • and lastly, if the topic is created through Lenses, the schema will be automatically set

Creating the JSON data

With the following SQL we can create our intial JSON topic:

to which we can add data using:

It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json in our explore view:

Creating the AVRO topic

To create a new topic with format AVRO, we can create a processor that will copy the data from our original topic to a new topic changing the format in the process.

To do this, we start by going to “SQL Processor”, clicking: “New SQL Processor” and defining our processor with the following code:

Notice the addition of STORE KEY AS AVRO VALULE AS AVRO. This statement will tell our processor which format we want each facet (key or value) to be stored as.

Hitting “Create New Processor” will start a new processor

We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro into car_speed_events_avro.

We can also verify that the topic format of our new topic is also AVRO for both the key and value facets:

CREATE TABLE car_speed_events_json(
    _key.plate string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (json, json);
INSERT INTO car_speed_events_json (
    _key.plate
    , speedMph
    , sensor.id 
    , sensor.lat
    , sensor.long 
    , event_time 
) VALUES
("20-GH-38", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("20-VL-12", 30, "sensor_1", 45.1241, 1.177, 1591970345),
("20-JH-98", 90, "sensor_1", 45.1241, 1.177, 1591970345);
SET defaults.topic.autocreate=true;

INSERT INTO car_speed_events_avro
STORE 
    KEY AS AVRO 
    VALUE AS AVRO
SELECT STREAM *
FROM car_speed_events_json;

Filtering & Joins

Complex types

Aggregations

Rekeying data

This page describes a tutorial to rekey data in a Kafka topic with Lenses SQL Processors.

Sometimes you have a topic that is almost exactly what you need, except that the key of the record requires a bit of massaging.

In Lenses SQL you can use the special SELECT ... as _key syntax to quickly re-key your data.

In our example, we have a topic containing events coming from temperature sensors.

Each record contains the sensor’ measured temperature, the time of the measurement, and the id of the sensor. The key is a unique string (for example, a UUID) that the upstream system assigns to the event.

You can replicate the example, by creating a topic in SQL Studio:

CREATE TABLE temperature_events(
    sensor_id string
    , temperature int
    , event_time long
)
FORMAT (string, avro);

We can also insert some example data to do our experiments:

You can explore the topic in lenses to check the content and the shape of what you just inserted.

Let’s say that what you need is that same stream of events, but the record key should be the sensor_id instead of the UUID.

With the special SELECT ... as _key syntax, a few lines are enough to define our new re-keying processor:

The query above will take the sensor_id from the value of the record and put it as the new key. All the values fields will remain untouched:

Maybe the sensor_id is not enough, and for some reason, you also need the hour of the measurement in the key. In this case, the key will become a composite object with two fields: the sensor_id and the event_hour:

As you can see, you can build composite objects in Lenses with ease just listing all the structure’s fields, one after the other.

In the last example, the _key output storage format will be inferred automatically by the system as JSON. If you need more control, you can use the STORE AS clause before the SELECT.

The following example will create a topic as the previous one, but where the keys will be stored as AVRO:

Happy re-keying!

Controlling event time

This describes how to control event time for data in your Kafka topics with Lenses SQL Processors.

Every message in Kafka comes with a timestamp, and Lenses Engine Streaming mode uses that by default when doing time-dependent operations, like aggregations and joins.

Sometimes though that timestamp is not exactly what you need, and you would like to use a field in the record value or key as the new timestamp.

In Lenses SQL you can use the special EVENTTIME BY ... syntax to control records timestamp.

Setting up our example

In our toy example, we have a simple topic where electricity meter readings events are collected:

We can also insert some example data to do our experiments:

If you query the events, you can see that Kafka sets a timestamp for each record. That timestamp is, in our case, the time of when the record was inserted. As you can see, it is totally unrelated to the event_time field we have in the payload.

Computing moving averages

We would like to transform our original stream of events, aggregating events with a hopping window of 10s width and an increment of 5s, computing the average for each window.

You can create a new processor that streams those averages, using the special WINDOW BY ... syntax:

For customer 1, we have three events in input, with a 5s delay between them, so we expect four output events for that customer, since 4 is the number of hopping windows involved.

ButChecking the emitted records we see that only two are produced.

This is because by default windowing operations works on the record timestamp, and in our case all the timestamps are pretty much the same, and they coincide with the time the records were inserted.

Fortunately e can change this behavior using the special EVENTTIME BY ... syntax, specifying an expression to be used as a timestamp:

As you can see, the results have been windowed using event_time as the timestamp:

Unwrapping complex types

This page describes a tutorial to unwrap a complex data type in a Kafka topic using Lenses SQL Processors.

In this example, we will show how Lenses can be used to transform complex data types into simple primitive ones.

Setting up

We start this tutorial by creating a topic which will hold information regarding visits to our website:

CREATE TABLE lenses_monitoring(
   _key.landing_page string
  , _key.user string
  , time_spent_s int
)
FORMAT(avro, avro);

Firstly we’ll add some data to our newly created topic:

INSERT INTO lenses_monitoring(
    _key.landing_page
    , _key.user
    , time_spent_s
) VALUES
("homepage", "anon_21", 30),
("why-lenses", "anon_32", 45),
("use-cases", "anon_56", 12),
("customers", "anon_36", 12),
("use-cases", "anon_126", 12);          

Unwrapping the data

For example, let’s say we’re interested in sending this data to a service that analyses the time spent on a page and how it changes over time.

This system has a caveat though it only accepts data where keys are specified as strings and values are specified as integers.

Rather than having to reimplement our analysis system, we can create a SQL Processor that will continuously send data to a new topic in a format the target system can work with:

Notice the addition of the as _key and as _value aliases; these tell lenses to “unwrap” the values; effectively making lenses write them as primitive types (string and integer respectively) instead of (in this particular case) Avro objects.

Lenses will also automatically infer the format of each topic facet, in this case it set them to STRING and INT respectively.

--

INSERT INTO temperature_events(
    _key
    , sensor_id
    , temperature
    , event_time
) VALUES
("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "sensor-1", 12, 1591970345),
("0958C1E2-804F-4110-B463-7BEA25DA680C", "sensor-2", 13, 1591970346),
("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "sensor-3", 28, 1591970347),
("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "sensor-1", 13, 1591970348),
("A7F19576-6EFA-40A0-91C3-D8999A41F453", "sensor-1", 12, 1591970348),
("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "sensor-2", 31, 1591970349),
("8401CD82-ABFF-4D1B-A564-986DC971F913", "sensor-2", 30, 1591970349),
("F1CF77A0-6CFB-472F-B368-28F011F46C64", "sensor-1", 13, 1591970349);
CREATE TABLE electricity_events(
    KW double
    , customer_id int
    , event_time long
)
FORMAT (string, avro);
SET defaults.topic.autocreate=true;
INSERT INTO analysis_topic 
SELECT STREAM 
    _key.landing_page as _key
    , time_spent_s as _value
FROM lenses_monitoring;
SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor
SELECT STREAM sensor_id as _key
FROM temperature_events;
SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor_and_hour
SELECT STREAM
    temperature_events.sensor_id as _key.sensor_id
    , temperature_events.event_time / 3600 as _key.event_hour
FROM temperature_events;
SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor_and_hour_avro_key
STORE KEY AS AVRO
SELECT STREAM
    temperature_events.sensor_id as _key.sensor_id
    , FLOOR(temperature_events.event_time / 3600) as _key.event_hour
FROM temperature_events;
INSERT INTO electricity_events(
    KW
    , customer_id
    , event_time
) VALUES
(1.0, 1, 1592848400000),
(2.0, 2, 1592848400000),
(1.5, 3, 1592848400000),
(2.0, 1, 1592848405000),
(1.0, 2, 1592848405000),
(2.5, 3, 1592848405000),
(3.0, 1, 1592848410000),
(0.5, 2, 1592848410000),
(4.0, 3, 1592848405000)
;
[
{"value":{"KW":1,"customer_id":1,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840812,...}},
{"value":{"KW":2,"customer_id":2,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840821,...}},
{"value":{"KW":1.5,"customer_id":3,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840828,...}},
{"value":{"KW":2,"customer_id":1,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840834,...}},
{"value":{"KW":1,"customer_id":2,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840842,...}},
{"value":{"KW":2.5,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840848,...}},
{"value":{"KW":3,"customer_id":1,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840855,...}},
{"value":{"KW":0.5,"customer_id":2,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840862,...}},
{"value":{"KW":4,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840868,...}}
]
SET defaults.topic.autocreate=true;

INSERT INTO electricity_events_avg_wrong
SELECT STREAM 
    customer_id
    , AVG(KW) as KW
FROM electricity_events
WINDOW BY HOP 10s,5s
GROUP BY customer_id
SET defaults.topic.autocreate=true;

INSERT INTO electricity_events_avg
SELECT STREAM 
    customer_id
    , AVG(KW) as KW
FROM electricity_events
EVENTTIME BY event_time
WINDOW BY HOP 10s,5s
GROUP BY customer_id
[
{"key":{"value":1,"window":{"start":1592848395000,"end":null}},"value":{"customer_id":1,"KW":1}, ...},
{"key":{"value":1,"window":{"start":1592848400000,"end":null}},"value":{"customer_id":1,"KW":1.5}, ...},
{"key":{"value":1,"window":{"start":1592848405000,"end":null}},"value":{"customer_id":1,"KW":2.5}, ...},
{"key":{"value":1,"window":{"start":1592848410000,"end":null}},"value":{"customer_id":1,"KW":3}, ...}
]

Controlling AVRO record names and namespaces

This page a tutorial to control AVRO record names and namespaces with Lenses SQL Processors.

When writing output as AVRO, Lenses creates schemas for you, automatically generating AVRO record names.

In this tutorial we will learn how to override the default record naming strategy.

In Lenses SQL you can use a SET statement to control the record and namespace name generated for the AVRO schema.

Setting up our input topic

We are going to create and populate a topic that we will later use in a couple of SQL Processors.

In SQL Studio, create the topic running the following query:

For the purposes of our tutorial, it is enough to insert a single topic:

Create a simple SQL Processor

We are now going to create a processor that will show the default behavior of AVRO record naming in Lenses.

The processor does not do much, it just reshapes the fields of the original topic, putting some of them in a nested field:

We then start the processor. Lenses will create the new topic mytopic_2, and new schema will be created in the Schema Registry, as soon as the first (and only) record is processed.

If we inspect the value schema of mytopic_2, we see that this is the one generated:

As we can see, each record type has a name (it is mandatory in AVRO), and Lenses has generated those names automatically for us (record, record0, record1 etc.).

Set the record name and the namespace of the value schema

We are now going to see how to override that default behavior.

Let’s create and start the new processor with the following SQL:

Notice how we added the new SET statements to the query:

These settings are telling Lenses to set the root record name and namespace to the values specified.

If we now check the value schema for mytopic_3 we get:

As we can see, the root record element has now name myRecordName and namespace myNamespace.

Notice how the settings did not affect nested records.

Set the record name and the namespace of the key schema

If the key of the generated topic has AVRO format as well, you can use the following analogous settings to control the key record name and namespace:

More control on the topic affected by the setting

A setting like the one we used before for the value schema:

will affect all the topics used by the processor.

If you want instead to target a single topic, you can use the topic-specific version:

The setting above will override the record name only for topic mytopic_3. Other topics will not be affected and will keep using the default naming strategy.

CREATE TABLE mytopic(a string, b string, c string) FORMAT (STRING, AVRO);
INSERT INTO mytopic(a, b, c) VALUES ("a", "b", "c");
SET defaults.topic.autocreate=true;

INSERT INTO mytopic_2
SELECT STREAM a as x.a, b as x.y.b, c as x.y.c
FROM mytopic
{
  "type": "record",
  "name": "record1",
  "fields": [
    {
      "name": "x",
      "type": {
        "type": "record",
        "name": "record0",
        "fields": [
          {
            "name": "a",
            "type": "string"
          },
          {
            "name": "y",
            "type": {
              "type": "record",
              "name": "record",
              "fields": [
                {
                  "name": "b",
                  "type": "string"
                },
                {
                  "name": "c",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    }
  ]
}
SET defaults.topic.autocreate=true;
SET defaults.topic.value.avro.record="myRecordName";
SET defaults.topic.value.avro.namespace="myNamespace";

INSERT INTO mytopic_3
SELECT STREAM a as x.a, b as x.y.b, c as x.y.c
FROM mytopic
SET defaults.topic.value.avro.record="myRecordName";
SET defaults.topic.value.avro.namespace="myNamespace";
{
  "type": "record",
  "name": "myRecordName",
  "namespace": "myNamespace",
  "fields": [
    {
      "name": "x",
      "type": {
        "type": "record",
        "name": "record0",
        "fields": [
          {
            "name": "a",
            "type": "string"
          },
          {
            "name": "y",
            "type": {
              "type": "record",
              "name": "record",
              "fields": [
                {
                  "name": "b",
                  "type": "string"
                },
                {
                  "name": "c",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    }
  ]
}
SET defaults.topic.key.avro.record="myRecordName";
SET defaults.topic.key.avro.namespace="myNamespace";
SET defaults.topic.value.avro.record="myRecordName"
SET topic.mytopic_3.value.avro.record="myRecordName"

Enriching data streams

This page describes a tutorial to enrich a Kafka topic using Lenses SQL Processors.

In this article, we will be enriching customer call events with their customer details.

Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.

Topics involved:

  • customer_details messages contain information about the customer

  • customer_call_details messages contain information about calls

Streaming SQL enrichment on Apache Kafka

Testing data

To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:

CREATE TOPIC customer_details

POPULATE TOPIC customer_details

CREATE TOPIC customer_call_details

POPULATE TOPIC customer_call_details

Validate results

Changing the shape of data

This page describe a tutorial to change the shape (fields) of data in a Kafka topic using Lenses SQL Processors.

In this tutorial, we will see how to use Lenses SQL to alter the shape of your records.

In Lenses SQL you can quickly reshape your data with a simple SELECT and some built-in functions.

We will learn how to

  • put value fields into the key

  • lift key fields into the value

  • call functions to transform your data

  • build nested structures for your keys/values

  • unwrap singleton objects into primitive types

Setting up our example

In our example, we are getting data from speed sensors from a speed car circuit.

The upstream system registers speed measurement events as records in a Kafka topic.

An example of such an event is the following:

We can replicate such a structure running the following query in SQL Studio:

Each event is keyed by a unique string generated by the upstream system.

We can again use SQL Studio to insert some data to play with:

Simple projections

In this section, we are only interested in the speed of single cars, and we do not care about all the other fields.

We want to use the car id, which now is part of the record Value, to become the new key (using the special as _key syntax). We also want the car speed as the new record Value.

To achieve that we can create a new SQL Processor using some simple projections.

Checking the records emitted by the processor we see that the shape of the records is

We want to avoid that intermediate wrapping of speedMph inside an object. To do that we can tell Lenses to unwrap the value with the special as _value syntax, saving some bytes and CPU cycles:

Now the shape of the records is what we had in mind:

Using built-in functions

This time we want to do some more complex manipulation. We want to convert the speed from Mph to Kmph, and we would also want to build a nice string describing the event.

An example of an output record would be:

In this case, we are using CONCATENATE to concatenate multiple strings, CAST to convert an expression to another type, and *, the usual multiplication operator.

If we check the resulting records, we can see that we obtained the shape we were looking for. Please note that the keys have been left untouched:

Composite objects

In this last example, we will show how to create composite keys and values in our projections.

We want both the sensor id and the event_time as the record Key. For the record Value, we want the car_id and the speed, expressed both as Mph and Kmph.

Lenses SQL allows as to use nested aliases to build nested structures. You have to put some dots in your aliases.

The resulting shape of the record is what we were aiming for:

Happy re-shaping!

Joining streams of data

This page describes a tutorial joining Kafka topics with Lenses SQL Processors.

magine you are the next Amazon, and you want to track the orders and shipment events to work out which orders have been shipped and how long it took. In this case, there will be two data streams, one for each event type, and the resulting stream will answer the questions above.

Enriching two streams of data requires a sliding window join. The events are said to be “close” to each other, if the difference between their timestamp is up to the time window specified.

Topics involved:

  • orders messages contain information about a customer

  • shipments messages contain information about the shipment

Streaming SQL enrichment on Apache Kafka

The query combines the data from orders and shipments if the orders are processed within 24 hours. Resulting records contain the order and shipment identifier, and the time between the order was registered to the time it was shipped.

Testing data

To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:

CREATE TOPIC orders

POPULATE TOPIC orders

CREATE TOPIC shipments

POPULATE TOPIC shipments

The output seen in the next screenshot shows two records. For the order with o2 identifier, there is no shipments entry because it has not been processed. For the order with identifier o3, the shipment happened after one day.

Validate results

Let’s switch to the Snapshot engine by navigating to SQL Studio menu item. With the entries in both topics, we can write the following query to see which data is joinable without the window interval:

These are the results for the non-streaming query (i.e., Snapshot)

Running the query returned three records. But you can see the order o3 was processed two days after it was placed. Let’s apply the sliding window restriction for the Snapshot query by adding a filter to only match those records having their timestamp difference within a day.

Now the result matches the one from Streaming query.

Conclusion

In this tutorial you learned how to join to Streams together using a sliding window. You achieved all the above using Lenses SQL engine.

Good luck and happy streaming!

Filtering data

This page describes a tutorial to filter records in a Kafka topic with Lenses SQL Processors.

Filtering messages and copying them to a topic can be achieved using the WHERE clause.

Setting up our example

In our example, we have a topic where our application registers bank transactions.

We have a topic called payments where records have this shape:

We can replicate such a structure running the following query in SQL Studio:

Time window aggregations

This page describes a tutorial to perform time windowed aggregations on Kafka topic data with Lenses SQL Processors.

In this tutorial we will see how data in a Stream can be aggregated continuously using GROUP BY over a time window and the results are emitted downstream.

In Lenses SQL you can read your data as a STREAM and quickly aggregate over it using the GROUP BY clause and SELECT STREAM

Content of result topic
Content of SQL snapshot result topic
Each event has a unique string key generated by the upstream system.

We can again use SQL Studio to insert some data to play wi

Find big transactions

Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.

We want to copy those transactions into a new topic, maintaining the content of the records as it is.

For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.

Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE statement with a >= condition:

Checking the records emitted by the processor, we see that we got the transactions we were looking for.

Because of the * projection, records content has not changed.

Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with ANDs and ORs:

As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE statement for that:

getting the results:

Filtering by date

In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions.

You will also see how to use a CAST expression to convert from one type to another.

Checking the output, we can see that only one transaction satisfied our predicate:

Sampling with random functions

Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.

We decided then to build a reduced copy of the payments topic, with only 10% of the original records.

To do that we are going to use our RANDINT function:

RANDINT generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.

We have to CAST to double on the way; otherwise, the division would be between integers, and the result would always be 0.

SET defaults.topic.autocreate=true;

INSERT INTO customers_callInfo
SELECT STREAM
    calls._value AS call 
    , customer._value AS customer
FROM  customer_call_details AS calls
        INNER JOIN  (SELECT TABLE * FROM customer_details) AS customer
            ON customer._key.customer.id = calls._key.customer.id
CREATE TABLE customer_details(
        _key.customer.typeID string
        , _key.customer.id string
        , customer.name string
        , customer.middleName string null
        , customer.surname string
        , customer.nationality string
        , customer.passportNumber string
        , customer.phoneNumber string
        , customer.email string null
        , customer.address string
        , customer.country string
        , customer.driverLicense string null
        , package.typeID string
        , package.description string
        , active boolean
)
FORMAT(avro, avro)
PROPERTIES(partitions=5, replication=1, compacted=true);
INSERT INTO customer_details(
        _key.customer.typeID
        , _key.customer.id
        , customer.name
        , customer.middleName
        , customer.surname
        , customer.nationality
        , customer.passportNumber
        , customer.phoneNumber
        , customer.email
        , customer.address
        , customer.country
        , customer.driverLicense
        , package.typeID
        , package.description
        , active
) VALUES
("userType1","5162258362252394","April","-","Paschall","GBR","APGBR...","1999153354","[email protected]","-","GBR","-","TypeA","Desc.",true),
("internal","5290441401157247","Charisse","-","Daggett","USA","CDUSA...","6418577217","[email protected]","-","USA","-","TypeC","Desc.",true),
("internal","5397076989446422","Gibson","-","Chunn","USA","GCUSA...","8978860472","[email protected]","-","USA","-","TypeC","Desc.",true),
("partner","5248189647994492","Hector","-","Swinson","NOR","HSNOR...","8207437436","[email protected]","-","NOR","-","TypeA","Desc.",true),
("userType1","5196864976665762","Booth","-","Spiess","CAN","BSCAN...","6220504387","[email protected]","-","CAN","-","TypeA","Desc.",true),
("userType2","5423023313257503","Hitendra","-","Sibert","SWZ","HSSWZ...","6731834082","[email protected]","-","SWZ","-","TypeA","Desc.",true),
("userType2","5337899393425317","Larson","-","Asbell","SWE","LASWE...","2844252229","[email protected]","-","SWE","-","TypeA","Desc.",true),
("partner","5140590381876333","Zechariah","-","Schwarz","GER","ZSGER...","4936431929","[email protected]","-","GER","-","TypeB","Desc.",true),
("internal","5524874546065610","Shulamith","-","Earles","FRA","SEFRA...","2119087327","[email protected]","-","FRA","-","TypeC","Desc.",true),
("userType1","5204216758311612","Tangwyn","-","Gorden","GBR","TGGBR...","9172511192","[email protected]","-","GBR","-","TypeA","Desc.",true),
("userType1","5336077954566768","Miguel","-","Gonzales","ESP","MGESP...","5664871802","[email protected]","-","ESP","-","TypeA","Desc.",true),
("userType3","5125835811760048","Randie","-","Ritz","NOR","RRNOR...","3245795477","[email protected]","-","NOR","-","TypeA","Desc.",true),
("userType1","5317812241111538","Michelle","-","Fleur","FRA","MFFRA...","7708177986","[email protected]","-","FRA","-","TypeA","Desc.",true),
("userType1","5373595752176476","Thurborn","-","Asbell","GBR","TAGBR...","5927996719","[email protected]","-","GBR","-","TypeA","Desc.",true),
("userType3","5589753170506689","Noni","-","Gorden","AUT","NGAUT...","7288041910","[email protected]","-","AUT","-","TypeA","Desc.",true),
("userType2","5588152341005179","Vivian","-","Glowacki","POL","VGPOL...","9001088901","[email protected]","-","POL","-","TypeA","Desc.",true),
("partner","5390713494347532","Elward","-","Frady","USA","EFUSA...","2407143487","[email protected]","-","USA","-","TypeB","Desc.",true),
("userType1","5322449980897580","Severina","-","Bracken","AUT","SBAUT...","7552231346","[email protected]","-","AUT","-","TypeA","Desc.",true);
CREATE TABLE customer_call_details(
    _key.customer.typeID string
    , _key.customer.id string
    , callInfoCustomerID string
    , callInfoType string
    , callInfoDuration int
    , callInfoInit int)
FORMAT(avro, avro)
PROPERTIES(partitions=1, replication=1, compacted=false)
INSERT INTO customer_call_details(
    _key.customer.typeID
    , _key.customer.id
    , callInfoCustomerID
    , callInfoType
    , callInfoDuration
    , callInfoInit
) VALUES
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 470, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 67, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 377, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 209, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 209, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 887, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 203, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 1698, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 320, 1),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 89, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 355, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 65, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 43, 1),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 530, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 270, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1633, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 110, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 540, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 168, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1200, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 1200, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 22, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 333, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 87, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 123, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 182, 1),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 844, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 56, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 36, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 794, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 440, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 52, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 770, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 627, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 555, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 55, 1);
SELECT
    p.callInfoCustomerID AS customerID
    , p.callInfoType
    , p.callInfoInit
FROM customer_call_details AS p
        INNER JOIN customer_details AS c
            ON p._key.customer.id = c._key.customer.id
KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
VALUE: {
  "car_id": "car_2",
  "speedMph": 84,
  "sensor": {
    "id": "sensor_1",
    "lat": 45,
    "long": 0
  },
  "event_time": 159230614
}
CREATE TABLE car_speed_events(
    car_id string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (string, avro);
INSERT INTO car_speed_events(
    _key
    , car_id
    , speedMph
    , sensor.id
    , sensor.lat
    , sensor.long
    , event_time
) VALUES
("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "car-1", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("0958C1E2-804F-4110-B463-7BEA25DA680C", "car-2", 54, "sensor_2", 45.122, 1.75, 1591970346),
("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "car-3", 60, "sensor_1", 45.1241, 1.177, 1591970347),
("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "car-1", 55, "sensor_2", 45.122, 1.75, 1591970348),
("A7F19576-6EFA-40A0-91C3-D8999A41F453", "car-1", 56, "sensor_1", 45.1241, 1.177, 1591970348),
("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "car-2", 58, "sensor_3", 45.123, 1.176, 1591970349),
("8401CD82-ABFF-4D1B-A564-986DC971F913", "car-2", 60, "sensor_1", 45.1241, 1.177, 1591970349),
("F1CF77A0-6CFB-472F-B368-28F011F46C64", "car-1", 53, "sensor_3", 45.123, 1.176, 1591970349);
SET defaults.topic.autocreate=true;

INSERT INTO only_car_speeds
SELECT STREAM 
    car_id AS _key
    , speedMph
FROM car_speed_events;
KEY: "car-1"
VALUE: { "speedMph": 50 }
SET defaults.topic.autocreate=true;

INSERT INTO only_car_speeds_unwrapped
SELECT STREAM 
    car_id AS _key
    , speedMph AS _value
FROM car_speed_events;
KEY: "car-1"
VALUE: 50
{
    "speedKmph": 160.93,
    "description": "Car car_1 was running at 100Mph "
}
SET defaults.topic.autocreate=true;

INSERT INTO car_speeds_kmph
SELECT STREAM
    speedMph * 1.60934 AS speedKmph
    , CONCATENATE(
        'Car ',
        car_id,
        ' was running at ',
        CAST(speedMph AS STRING),
        'Mph') AS description
FROM car_speed_events;
KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
VALUE: {
    speedKmph:"80.467",
    description:"Car car-1 was running at 50Mph"
}
KEY: {
    "sensor_id": "sensor_1",
    "event_time": 1591970345
}
VALUE: {
    "car_id": "car_1",
    "speed": {
        "mph": 100,
        "kmph": 161
    } 
}
SET defaults.topic.autocreate=true;

INSERT INTO car_speds_by_sensor_and_time
SELECT STREAM
    sensor.id AS _key.sensor_id
    , event_time AS _key.event_time
    , car_id
    , speedMph AS speed.mph
    , speedMph * 1.60934 AS speed.kmph
FROM car_speed_events;
KEY: {
    "sensor_id": "sensor_1",
    "event_time": 1591970345
}
VALUE: {
    "car_id": "car_1",
    "speed": {
        "mph": 100,
        "kmph": 160
    } 
}
SET defaults.topic.autocreate=true;
SET auto.offset.reset = 'earliest';

WITH o as (
 SELECT STREAM *
 FROM orders
 EVENTTIME BY orderTimestamp
);

WITH s as (
 SELECT STREAM *
 FROM shipments
 EVENTTIME BY timestamp
);

INSERT INTO orders_and_shipment
SELECT STREAM
    o._key AS orderId 
    , s._key AS shipmentId
    , DATE_TO_STR(TO_TIMESTAMP(s.timestamp - o.orderTimestamp), 'HH:mm:ss') AS time_difference
FROM  o INNER JOIN s
    ON o._key = s.orderId
WITHIN 24h
CREATE TABLE orders(
        _key string
        , customerId string
        , orderTimestamp long
        , amount decimal(18,2)
)
FORMAT(string, avro)
INSERT INTO orders(
        _key
        , customerId
        , orderTimestamp
        , amount
) VALUES
("o1","[email protected]",1596374427000,99.9),   --Sunday, 2 August 2020 13:20:27
("o2","[email protected]",1596453627000,240.01), --Monday, 3 August 2020 11:20:27
("o3","[email protected]",1596280827000,81.81),        --Saturday, 1 August 2020 11:20:27
("o4","[email protected]",1596453627000,300.00);       --Monday, 3 August 2020 11:20:27
CREATE TABLE shipments(
    _key string
    , orderId string
    , timestamp long)
FORMAT(string, avro)
INSERT INTO shipments(
    _key
    , orderId
    , `timestamp`
) VALUES
("s1", "o1", 1596445927000),   --Monday, 3 August 2020 09:12:07
("s2", "o3", 1596456112000),   --Monday, 3 August 2020 12:01:52
("s3", "o4", 1596460271000);   --Monday, 3 August 2020 13:11:11
SELECT 
    o._key AS order 
   , DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
    , s._key AS shipment 
   , DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN  shipments s
    ON o._key = s.orderId
SELECT 
    o._key AS order 
    , DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
    , s._key AS shipment 
    , DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN  shipments s
    ON o._key = s.orderId
WHERE to_timestamp(s.timestamp) - '1d' <= to_timestamp(o.orderTimestamp)

KEY: "6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: {
  "amount": "12.10",
  "currency": "EUR",
  "from_account": "xxx",
  "to_account": "yyy", 
  "time": 1591970345000
}
CREATE TABLE payments(
    amount double
    , currency string
    , from_account string
    , to_account string
    , time datetime
)
FORMAT (string, avro);
INSERT INTO payments(
    _key
    , amount
    , currency 
    , from_account
    , to_account 
    , time
)
VALUES
("6A461C60-02F3-4C01-94FB-092ECBDE0837", 10, "EUR", "account-1", "account-2", 1590970345000),
("E5DA60E8-F622-43B2-8A93-B958E01E8AB3", 100000, "EUR", "account-1", "account-3", 1591070346000),
("0516A309-FB2B-4F6D-A11F-3C06A5D64B68", 5300, "USD", "account-2", "account-3", 1591170347000),
("0871491A-C915-4163-9C4B-35DEA0373B41", 6500, "EUR", "account-3", "account-1", 1591270348000),
("2B557134-9314-4F96-A640-1BF90887D846", 300, "EUR", "account-1", "account-4", 1591370348000),
("F4EDAE35-45B4-4841-BAB7-6644E2BBC844", 3400, "EUR", "account-2", "account-1", 1591470349000),
("F51A912A-96E9-42B1-9AC4-42E923A0A6F8", 7500, "USD", "account-2", "account-3", 1591570352000),
("EC8A08F1-75F0-49C8-AA08-A5E57997D27A", 2500000, "USD", "account-1", "account-3", 1591670356000),
("9DDBACFF-D42B-4042-95AC-DCDD84F0AC32", 1000, "GBP", "account-2", "account-3", 1591870401000)
;
SET defaults.topic.autocreate=true;

INSERT INTO big_payments
SELECT STREAM *
FROM payments
WHERE amount >= 5000
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, ... }
------------------------------------
KEY:"0516A309-FB2B-4F6D-A11F-3C06A5D64B68"
VALUE: { amount:5300, ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, ... }
SET defaults.topic.autocreate=true;

INSERT INTO big_eur_usd_payments
SELECT STREAM *
FROM payments
WHERE
    (amount >= 5000 AND currency = 'EUR') OR
    (amount >= 5500 AND currency = 'USD')
SET defaults.topic.autocreate=true;

INSERT INTO big_payments_case
SELECT STREAM *
FROM payments
WHERE
    amount >= (CASE 
        WHEN currency = 'EUR' THEN 5000
        WHEN currency = 'USD' THEN 5500
        WHEN currency = 'GBP' THEN 4500
        ELSE 5000
    END)
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, currency:"EUR", ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, currency:"USD", ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", ... }
SET defaults.topic.autocreate=true;

INSERT INTO night_payments
SELECT STREAM *
FROM payments
WHERE
    CAST(HOUR(time) AS INT) >= 0 AND
    CAST(HOUR(time) AS INT) <= 5 
KEY:"6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: { amount:10, currency:"EUR", time:1590970345000, ... }
------------------------------------
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", time:1591070346000, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", time:1591670356000, ... }
SET defaults.topic.autocreate=true;

INSERT INTO payments_sample
SELECT STREAM *
FROM payments
WHERE CAST(ABS(RANDINT()) AS DOUBLE) / 2147483647 < 0.01
Setting up our example

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The startAt the date and time the game commenced

    • The endedAt the date and time the game finished

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

    • an age

Keep in mind This is just an example in the context of this tutorial. Putting denormalised data in keys is not something that should be done in a production environment.

In light of the above, a record might look like the following (in json for simplicity):

We can replicate such structure using SQL Studio and the following query:

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

The time a game started and completed is expressed in epoch time. To see the human readable values, run this query:

Count how many games were played per user every 10 seconds

Now we can start processing the data we have inserted above.

One requirement could be to count how many games each user has played every 10 seconds.

We can achieve the above with the following query:

The content of the output topic, games_per_user_every_10_seconds, can now be inspected and eventually it will look similar to this:

As you can see, the keys of the records did not change, but their value is the result of the specified aggregation. The gamer Billy Lagrange has two entries because he played 2 games, the first two with a start window between 2020-07-23 17:08:00 and 2020-07-23 17:08:10(exclusive), and the third entry between 2020-07-23 17:08:10 (inclusive) and 2020-07-23 17:08:20(exclusive).

You might have noticed that groupby-key has been created as a compacted topic, and that is by design.

All aggregations result in a Table because they maintain a running, fault-tolerant, state of the aggregation and when the result of an aggregation is written to a topic, then the topic will need to reflect these semantics (which is what a compacted topic does).

Count how many games were played per country every 10 seconds

We can expand on the example from the previous section. We now want to know, for each country on a 10 seconds interval, the following:

  • count how many games were played

  • what are the top best 3 results

All the above can be achieved with the following query:

The content of the output topic, games_per_country_every_10_seconds, can now be inspected in the SQL Studio screen by running:

There are 2 entries for Italy, since there is one game played at 2020-07-23 18:08:11. Also, notice for the other entry on Italy, there are 4 occurrences and 3 max points. The reason for 4 occurrence is down to 4 games, two each from Billy Lagrange and Maria Rossi within the 10 seconds time window between 2020-07-23 18:08:00 and 2020-07-23 18:08:10(exclusive).

Conclusion

In this tutorial you learned how to use aggregation over Streams to:

  • group by the current key of a record

  • group by a field in the input record

  • use a time window to define the aggregation over.

Good luck and happy streaming!

Aggregating streams

This page describes a tutorial to aggregate data Kafka topic data into a stream using Lenses SQL Processors

In this tutorial we will see how data in a stream can be aggregated continuously using GROUP BY and how the aggregated results are emitted downstream.

In Lenses SQL you can read your data as a STREAM and quickly aggregate over it using the GROUP BY clause and SELECT STREAM

Setting up our example

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

Keep in mind this is just an example in the context of this tutorial. Putting denormalised data in keys is not something that should be done in a production environment.

In light of the above, a record might look like the following (in json for simplicity):

We can replicate such structure using SQL Studio and the following query:

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

Count how many games each user played

Now we can start processing the data we have inserted above.

One requirement could be to count how many games each user has played. Additionally, we want to ensure that, should new data come in, it will update the calculations and return the up to date numbers.

We can achieve the above with the following query:

The content of the output topic, groupby-key, can now be inspected in the Lenses Explore screen and it will look similar to this:

As you can see, the keys of the records did not change, but their value is the result of the specified aggregation.

You might have noticed that groupby-key has been created as a compacted topic, and that is by design.

All aggregations result in a Table because they maintain a running, fault-tolerant, state of the aggregation and when the result of an aggregation is written to a topic, then the topic will need to reflect these semantics (which is what a compacted topic does).

Add each user’s best results, and the average over all games

We can expand on the example from the previous section. We now want to know, for each user, the following:

  • count how many games the user has played

  • what are the user’s best 3 results

  • what is the user’s average of points

All the above can be achieved with the following query:

The content of the output topic, groupby-key-multi-aggs, can now be inspected in the Lenses Explore screen, and it will look similar to this:

Gather statistics about users playing from the same country and using the same language

Our analytics skills are so good that we are now asked for more. We now want to calculate the same statistics as before, but grouping together players that played from the same country and used the same language.

Here is the query for that:

The content of the output topic, groupby-country-and-language, can now be inspected in the Lenses Explore screen and it will look similar to this:

Notice how we projected sessionMetadata.language as sessionLanguage in the query. We could do that because sessionMetadata.language is part of the GROUP BY clause. Lenses SQL only supportsas Full Group By mode, so if the projected field is not part of the GROUP BY clause, the query will be invalid.

Filtering aggregation data

One final scenario we will cover in this tutorial is when we want to filter some data within our aggregation.

There are two possible types of filtering we might want to do, when it comes to aggregations:

  • Pre-aggregation: we want some rows to be ignored by the grouping, so they will not be part of the calculation done by aggregation functions. In these scenarios we will use the WHERE clause.

  • Post-aggregation: we want to filter the aggregation results themselves, so that those aggregated records which meet some specified condition are not emitted at all. In these scenarios we will use the HAVING clause.

Let’s see an example.

We want calculate the usual statistics from the previous scenarios, but grouping by the session language only. However, we are interested only in languages that are used a small amount of times (we might want to focus our marketing team’s effort there); additionally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations, if a given user appeared to have played from a given country.

For the sake of this example, we will:

  • Show statistics for languages that are used less than 9 times

  • Ignore sessions that Dave made from Spain (because we know he was not there)

The query for all the above is:

The content of the output topic, groupby-language-filtered, can now be inspected in the Lenses Explore screen and it will look similar to this:

Notice that IT (which is the only language that has 9 sessions in total) appears in the output but without any data in the value section.

This is because aggregations are Tables, and the key IT used to be present (while it was lower than 9), but then it was removed. Deletion is expressed, in Tables, by setting the value section of a record to null, which is what we are seeing here.

Conclusion

In this tutorial you learned how to use aggregation over Streams to:

  • group by the current key of a record

  • calculate multiple results in a single processor

  • group by a combination of different fields of the input record

  • filtering both the data that is to be aggregated, and the one that will be emitted by the aggregation itself

You achieved all the above using Lenses SQL engine.

You can now proceed to learn about more complex scenarios like aggregation over Tables and windowed aggregations.

Good luck and happy streaming!

Aggregating data in a table

This page describes a tutorial to aggregate Kafka topic data into a table using Lenses SQL Processors.

In this tutorial, we will see how data in a table can be aggregated continuously using GROUP BY and how the aggregated results are emitted downstream.

In Lenses SQL you can read your data as a TABLE and quickly aggregate over it using the GROUP BY clause and SELECT TABLE.

Setting up our example

Let’s assume that we have a topic (game-sessions) containing data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

Putting denormalised data in keys is not something that should be done in a production environment.

In light of the above, a record might look like the following (in JSON for simplicity):

We can replicate such structure using SQL Studio and the following query:

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

Count the users that are in a given country

Now we can start processing the data we have inserted above.

Let’s imagine that we are told that we want to keep a running count of how many users are in a given country. To do this, we can assume that a user is currently in the same country where his last game took place.

We can achieve the above with the following query:

The content of the output topic, groupby-table-country, can now be inspected in the Lenses Explore screen and it will look similar to this:

The key results to notice here are the ones for Spain and the UK:

  • Spain is 2 because Jorge and Dave had their last game played there.

  • UK is 1 because, while Nigel had his only game played there, Dave initially played from the UK

The last point from above is the main difference (and power) of Tables vs. Streams: they represent the latest state of the world for each of their keys, so any aggregation will apply only on that latest data. If this is not clear enough.

Given what a Table is, it will have by definition only a single value for any given key, so doing GROUP BY _key on a Table is a pointless operation because it will always only generate 1-element groups.

Calculate the total and average points of games played in a given language

We can expand on the example from the previous section, imagining that our requirement was extended.

Just as before, we want to calculate statistics based on the current country of a user, as defined in Example 1, but now we want to know all the following:

  • count how many users are in a given country

  • what is the total amount of points these users achieved

  • what is the average amount of points these users achieved

All of the above can be achieved with the following query:

The content of the output topic, groupby-table-country-multi, can now be inspected in the Lenses Explore screen and it will look similar to this:

One thing to highlight here is that the functions we are using in this query (COUNT, SUM, and AVG) all support aggregating over Tables. However, that is not true of all functions. To find out which functions support Tables and which ones only support Streams.

Filtering aggregation data

We will cover one final scenario where we want to filter some data within our aggregation.

There are two possible types of filtering we might want to do when it comes to aggregations:

  • Pre-aggregation: we want some rows to be ignored by the grouping, so they will not be part of the calculation done by aggregation functions. In these scenarios, we will use the WHERE clause.

  • Post-aggregation: we want to filter the aggregation results themselves so that those aggregated records that meet some specified condition are not emitted at all. In these scenarios, we will use the HAVING clause.

Let’s see an example.

We want to calculate the statistics from Example 2, but grouping by the session language. Here we will make again the assumption that a user’s language is represented only by his latest recorded game session.

Additionally, we are only interested in languages used by players who don’t achieve a high total of points (we might want to focus our marketing team’s effort there, to keep them entertained). Finally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations if a given user appeared to have played from a given country.

For the sake of this example, we will:

  • Show statistics for languages with total points lower than 100

  • Ignore sessions that Dave made from Spain (because we know he was not there)

The query for all of the above is:

The content of the output topic, groupby-table-language-filtered, can now be inspected in the Lenses Explore screen and it will look similar to this:

Notice that IT (which is the only language that has 120 points in total) appears in the output but without any data in the value section.

This is because aggregations are Tables, and the key IT used to be present (while it was lower than 100), but then it was removed. Deletion is expressed, in Tables, by setting the value section of a record to null, which is what we are seeing here.

Conclusion

In this tutorial, you learned how to use aggregation over Tables to:

  • group by arbitrary fields, based on the latest state of the world

  • calculate multiple results in a single processor

  • filtering both the data that is to be aggregated and the one that will be emitted as a result of the aggregation itself

Good luck and happy streaming!

Using multiple topics

This page describes a tutorial to use multiple Kafka topics in a Lenses SQL Processor.

In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.

Setting up our example

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will is keyed by user details.

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

In light of the above, a record might look like the following (in JSON for simplicity):

Finally, let’s assume we also have another, normalised, compacted topic user-details, keyed by an int matching the pid from topic game-sessions and containing user information like address and period of membership to the platform.

In light of the above, a record might look like the following (in JSON for simplicity):

We can replicate such structures using SQL Studio and the following query:

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

Multiple transformations all in one go

Let’s imagine that, given the above data, we are given the following requirements:

  • For each country in the games-sessions, create a record with the count of games played in from that country. Write the results to the games-per-country topic.

  • For each record in the games-sessions, reshape the records to remove everything from the key beside pid. Additionally, add the user’s memberYears to the value. Write the results to the games-sessions-normalised topic .

We can obtain the above with the following query:

The result of this processor in the UI will be a processor graph similar to the following:

Finally, the content of the output topics games-per-country and games-sessions-normalised can now be inspected in the Lenses Explore screen:

Conclusion

In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.

Good luck and happy streaming!

Working with Arrays

This page describes a tutorial on how to work with array data in your Kafka topics using Lenses SQL Processors.

In this tutorial, we will see how to use Lenses SQL to extract, manipulate and inspect the single elements of an array.

In Lenses SQL you can use a LATERAL JOIN to treat the elements of an arrays as a normal field.

You will learn to:

  • Extract the single elements of an array with a single

{
  "key": {
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value": {
    "points": 5,
    "country": "Italy",
    "language": "IT",
    "startedAt": 1595435228,
    "endedAt": 1595441828
  }
}
CREATE TABLE game-sessions(
  _key.pid int,
  _key.name string,
  _key.surname string,
  _key.age int,
  points double,
  country string,
  startedAt long,
  endedAt long)
FORMAT (avro, avro);
INSERT into game-sessions(
  _key.pid,
  _key.name,
  _key.surname,
  _key.age,
  points,
  country,
  startedAt,
  endedAt
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 1595524080000, 1595524085000),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 1595524086000, 1595524089000),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 1595524091000, 1595524098000),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 1595524080000, 1595524085000),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 1595524086000, 1595524089000),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 1595524086000, 1595524089000),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 1595524080000, 1595524085000),
(5, 'John', 'Bolden', 40, 10, 'USA', 1595524080000, 1595524085000);
SELECT
  startedAt
  , DATE_TO_STR(startedAt, 'yyyy-MM-dd HH:mm:ss') as started
  , endedAt
  , DATE_TO_STR(endedAt 'yyyy-MM-dd HH:mm:ss') as ended
FROM game-sessions;
SET defaults.topic.autocreate=true;
SET commit.interval.ms='2000';  -- this is just to speed up the output generation in this tutorial

INSERT INTO games_per_user_every_10_seconds
SELECT STREAM
    COUNT(*) as occurrences
    , MAXK_UNIQUE(points,3) as maxpoints
    , AVG(points) as avgpoints
FROM game-sessions
EVENTTIME BY startedAt
WINDOW BY TUMBLE 10s
GROUP BY _key
SET defaults.topic.autocreate=true;
SET commit.interval.ms='2000';  -- this is just to speed up the output generation in this tutorial

INSERT INTO games_per_country_every_10_seconds
SELECT STREAM
    COUNT(*) as occurrences
    , MAXK_UNIQUE(points,3) as maxpoints
    , country
FROM game-sessions
EVENTTIME BY startedAt
WINDOW BY TUMBLE 10s
GROUP BY country
SELECT *
FROM games_per_country_every_10_seconds
Content of game-sessions displaying the human readable timestamps
Content of groupby-key topic
Content of games_per_country_every_10_seconds topic

an age

Content of `groupby-key` topic
Content of `groupby-key` topic
Content of `groupby-country-and-language` topic
Content of `groupby-language-filtered` topic

an age

, but then from
Italy
and finally from
Spain
.
Dave
contribution was, therefore,
subtracted
from the
UK
count value.
Content of groupby-table-country topic
Content of groupby-table-country-multi topic

an age

Processor graph for the above query
Content of `games-per-country` topic
Content of `games-sessions-normalised` topic
LATERAL
join.
  • Extract elements of a multi-level array with nested LATERAL joins.

  • Use arbitrary complex array expressions as the right hand side of a LATERAL join.

  • Lateral Join on a simple array

    In this example, we are getting data from a sensor.

    Unfortunately, the upstream system register the readings in batches, while what we need is a single record for every reading.

    An example of such a record is the following:

    Notice how each record contains multiple readings, inside the reading array field.

    We can replicate such a structure running the following query in SQL Studio:

    We can again use SQL Studio to insert some data to play with:

    What we want to obtain is a new topic readings, where each record contain a single reading, together with its meter_id. Considering the first record, we expect to explode it to four different records, one for each reading:

    In LensesSQL you can easily achieve that with the special LATERAL syntax .

    You can create a processor defined as:

    The magic happens in batched_readings LATERAL readings as reading. With that we are basically saying:

    • for each record in batched_readings

    • for each element inside the readings array of that record

    • build a new record with all the fields of the original batched_readings record, plus an extra reading field, that will contain the value of the current element of readings.

    We can then use in the SELECT both the original fields and the new reading field.

    If you save the processor and run it, you will see that it will emit the records we expected.

    Filtering

    One of the powerful features of a LATERAL join is that the expression you put in the LATERAL can be used as a normal field. This means that you can then use it for example also in a WHERE or in a GROUP BY.

    In this section we will see how to filter records generated by a LATERAL using a normal WHERE.

    We want to modify our previous processor in order to emit only the readings greater than 95.

    To do that is enough to use reading as if it were a normal field, in the WHERE section:

    Running the processor we get the records

    Lateral Join on a multi-level array

    This example is similar to the previous one. The only difference is that the readings are now stored in batches of batches:

    As you can see, nested_readings is an array whose elements are array of integers.

    We can again use SQL Studio to insert some data:

    We would like to define a processor that emits the same records of the previous one.

    In this case though we are dealing with nested_readings, that is a multi-level array, so a single LATERAL join is not enough. But nesting a LATERAL inside another will do the jo

    This is roughly what happens in the FROM clause:

    • We first unwrap the first level of the array, doing a batched_readings_nested LATERAL nested_readings as readings.

    • At that point, readings will be an array of integers.

    • We can then use it in the outer ... LATERAL readings as reading join, and the single integers will finally be extracted and made available as reading.

    Complex array expressions

    In this section we will see how it is possible to use any expression as the right hand side of a LATERAL join, as long as it gets evaluated to an array.

    We have a table where the meter readings are split into two columns, readings_day and readings_night.

    Let’s insert the same data as the first example, but where the readings are split across the two columns.

    To extract the readings one by one, we need first to concatenate the two arrays readings_day and readings_night. We can achieve that using flatten. We can then use the concatenated array in a LATERAL join:

    The processor defined above will emit the records

    as we expected.

    {
      "key": {
        "pid": 1,
        "name": "Billy",
        "surname": "Lagrange",
        "age": 30
      },
      "value": {
        "points": 5,
        "sessionMetadata": {
          "country": "Italy",
          "language": "IT"
        }
      }
    }
    CREATE TABLE game-sessions(
        _key.pid int
        , _key.name string
        , _key.surname string
        , _key.age int
        , points double
        , sessionMetadata.country string
        , sessionMetadata.language string)
    FORMAT (avro, avro);
    INSERT into game-sessions(
        _key.pid
        , _key.name
        , _key.surname
        , _key.age
        , points
        , sessionMetadata.country
        , sessionMetadata.language
    ) VALUES
    (1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
    (2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
    (2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
    (3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
    (4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
    (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
    (6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
    (7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
    (2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
    (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
    (3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
    (4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
    (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
    (6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
    (6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
    (2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
    ;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';  -- this is just to speed up the output generation in this tutorial
    
    INSERT INTO groupby-key
    SELECT STREAM
      COUNT(*) AS gamesPlayed
    FROM game-sessions
    GROUP BY _key;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';
    
    INSERT INTO groupby-key-multi-aggs
    SELECT STREAM
        COUNT(*) AS gamesPlayed
        , MAXK(points,3) as maxpoints
        , AVG(points) as avgpoints
    FROM game-sessions
    GROUP BY _key;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';
    
    INSERT INTO groupby-country-and-language
    SELECT STREAM
        COUNT(*) AS gamesPlayed
        , MAXK(points,3) as maxpoints
        , AVG(points) as avgpoints
        , sessionMetadata.language as sessionLanguage
    FROM game-sessions
    GROUP BY
        sessionMetadata.country
        , sessionMetadata.language;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';
    
    INSERT INTO groupby-language-filtered
    SELECT STREAM
        COUNT(*) AS gamesPlayed
        , MAXK(points,3) as maxpoints
        , AVG(points) as avgpoints
    FROM game-sessions
    WHERE _key.name != 'Dave'
        OR sessionMetadata.country != 'Spain'
    GROUP BY sessionMetadata.language
    HAVING gamesPlayed < 9;
    {
      "key":{
        "pid": 1,
        "name": "Billy",
        "surname": "Lagrange",
        "age": 30
      },
      "value":{
        "points": 5,
        "sessionMetadata": {
          "country": "Italy",
          "language": "IT"
        }
      }
    }
    CREATE TABLE game-sessions(
        _key.pid int
        , _key.name string
        , _key.surname string
        , _key.age int
        , points double
        , sessionMetadata.country string
        , sessionMetadata.language string)
    FORMAT (avro, avro);
    INSERT into game-sessions(
        _key.pid
        , _key.name
        , _key.surname
        , _key.age
        , points
        , sessionMetadata.country
        , sessionMetadata.language
    ) VALUES
    (1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
    (2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
    (2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
    (3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
    (4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
    (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
    (6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
    (7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
    (2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
    (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
    (3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
    (4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
    (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
    (6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
    (6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
    (2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
    ;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';  -- this is to speed up the output generation in this tutorial
    
    INSERT INTO groupby-table-country
    SELECT TABLE
      COUNT(*) AS gamesPlayed
    FROM game-sessions
    GROUP BY sessionMetadata.country;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';
    
    INSERT INTO groupby-table-country-multi
    SELECT TABLE
        COUNT(*) AS gamesPlayed
        , SUM(points) as totalpoints
        , AVG(points) as avgpoints
    FROM game-sessions
    GROUP BY sessionMetadata.country;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';
    
    INSERT INTO groupby-table-language-filtered
    SELECT TABLE
        COUNT(*) AS gamesPlayed
        , SUM(points) as totalpoints
        , AVG(points) as avgpoints
    FROM game-sessions
    WHERE _key.name != 'Dave' 
        OR sessionMetadata.country != 'Spain'
    GROUP BY sessionMetadata.language
    HAVING totalpoints < 100;
    {
      "key":{
        "pid": 1,
        "name": "Billy",
        "surname": "Lagrange",
        "age": 30
      },
      "value":{
        "points": 5,
        "sessionMetadata": {
          "country": "Italy",
          "language": "IT"
        }
      }
    }
    {
      "key": 1,
      "value":{
        "fullName": "Billy Lagrange",
        "memberYears": 3,
        "address": {
          "country": "Italy",
          "street": "Viale Monza 5",
          "city": "Milan"
        }
      }
    }
    CREATE TABLE game-sessions(
        _key.pid int
        , _key.name string
        , _key.surname string
        , _key.age int
        , points double
        , sessionMetadata.country string
        , sessionMetadata.language string
    )
    FORMAT (avro, avro);
    
    CREATE TABLE user-details(
        fullName string
        , memberYears int
        , address.country string
        , address.street string
        , address.city string
    ) FORMAT (int, avro);
    INSERT into game-sessions(
        _key.pid
        , _key.name
        , _key.surname
        , _key.age
        , points
        , sessionMetadata.country
        , sessionMetadata.language
    ) VALUES
    (1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
    (2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
    (2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
    (3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
    (4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
    (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
    (6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
    (7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
    (2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
    (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
    (3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
    (4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
    (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
    (6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
    (6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
    (2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
    (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
    ;
    
    INSERT into user-details(
        _key
        , fullName
        , memberYears
        , address.country
        , address.street
        , address.city
    ) VALUES
    (1, 'Billy Lagrange', 3, 'Italy', 'Viale Monza 5', 'Milan'),
    (2, 'Maria Rossi', 1, 'Italy', 'Stazione Termini', 'Rome'),
    (3, 'Jorge Escudero', 5, 'Spain', '50 Passeig de Gracia', 'Barcelona'),
    (4, 'Juan Suarez', 0, 'Mexico', 'Plaza Real', 'Tijuana'),
    (5, 'John Bolden', 2, 'USA', '100 Wall Street', 'New Work'),
    (6, 'Dave Holden', 1, 'UK', '25 Bishopsgate', 'London'),
    (7, 'Nigel Calling', 6, 'UK', '10 Queen Anne Street', 'Brighton')
    ;
    SET defaults.topic.autocreate=true;
    SET commit.interval.ms='1000';
    
    WITH userDetailsTable AS(
      SELECT TABLE *
      FROM user-details
    );
    
    WITH joinedAndNormalised AS(
      SELECT STREAM
        gs.*
        , ud.memberYears
      FROM game-sessions AS gs JOIN userDetailsTable AS ud 
            ON (gs._key.pid = ud._key)
    );
    
    INSERT INTO games-per-country
    SELECT STREAM
      COUNT(*) AS gamesPlayed
    FROM game-sessions
    GROUP BY sessionMetadata.country;
    
    INSERT INTO games-sessions-normalised
    SELECT STREAM *
    FROM joinedAndNormalised;
    KEY: 1
    VALUE: {
      "meter_id": 1,
      "readings": [100, 101, 102]
    }
    CREATE TABLE batched_readings(
      meter_id int,
      readings int[]
    ) FORMAT(int, AVRO);
    INSERT INTO batched_readings(_key, meter_id, readings) VALUES
    (1, 1, [100, 92, 93, 101]),
    (2, 2, [81, 82, 81]),
    (3, 1, [95, 94, 93, 96]),
    (4, 2, [80, 82])
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 100 }
    -----------------------------
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 92 }
    -----------------------------
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 93 }
    -----------------------------
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 101 }
    SET defaults.topic.autocreate=true;
    
    INSERT INTO
      readings
    SELECT STREAM
      meter_id,
      reading
    FROM
      batched_readings
      LATERAL readings as reading
    SET defaults.topic.autocreate=true;
    
    INSERT INTO
      readings
    SELECT STREAM
      meter_id,
      reading
    FROM
      batched_readings
      LATERAL readings as reading
    WHERE
      reading > 95
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 100 }
    -----------------------------
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 101 }
    -----------------------------
    KEY: 3
    VALUE: { "meter_id": 1, "reading": 96 }
    CREATE TABLE batched_readings_nested(
      meter_id int,
      nested_readings int[][]
    )
    FORMAT(int, AVRO);
    INSERT INTO batched_readings_nested(_key, meter_id, nested_readings) VALUES
    (1, 1, [[100, 92], [93, 101]]),
    (2, 2, [[81], [82, 81]]),
    (3, 1, [[95, 94, 93], [96]]),
    (4, 2, [[80, 82]])
    SET defaults.topic.autocreate=true;
    
    INSERT INTO
      readings
    SELECT STREAM
      meter_id,
      reading
    FROM
      batched_readings_nested
      LATERAL nested_readings as readings
      LATERAL readings as reading
    CREATE TABLE day_night_readings(
      meter_id int,
      readings_day int[],
      readings_night int[]
    )
    FORMAT(int, AVRO);
    INSERT INTO day_night_readings(_key, meter_id, readings_day, readings_night) VALUES
    (1, 1, [100, 92], [93, 101]),
    (2, 2, [81], [82, 81]),
    (3, 1, [95, 94, 93], [96]),
    (4, 2, [80], [81])
    SET defaults.topic.autocreate=true;
    
    INSERT INTO
      readings
    SELECT STREAM
      meter_id,
      reading
    FROM
      day_night_readings
      LATERAL flatten([readings_day, readings_night]) as reading
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 100 }
    -----------------------------
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 92 }
    -----------------------------
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 93 }
    -----------------------------
    KEY: 1
    VALUE: { "meter_id": 1, "reading": 101 }
    -----------------------------
    KEY: 2
    VALUE: { "meter_id": 2, "reading": 81 }
    ...