All pages
Powered by GitBook
2 of 25

Tutorials

Cover

SQL Studio

Learn how to use the SQL Studio to interactively query data in your Kafka topics and more.

Cover

Processing

Learn how to process data, join, filter aggregate and more with Lenses SQL Processors.

SQL Processors

Data formats

Changing data formats

Tutorial on how to change the format of data in a Kafka topic from JSON to AVRO with Lenses SQL Processors.

In this example, we will show how to create an AVRO topic from an existing JSON topic.

Requirements

For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:

  • through direct user action where the schema is manually set

  • through inference; Lenses will try to infer the schema of a topic by looking at the topic data

  • and lastly, if the topic is created through Lenses, the schema will be automatically set

Creating the JSON data

With the following SQL we can create our intial JSON topic:

CREATE TABLE car_speed_events_json(
    _key.plate string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (json, json);

to which we can add data using:

INSERT INTO car_speed_events_json (
    _key.plate
    , speedMph
    , sensor.id 
    , sensor.lat
    , sensor.long 
    , event_time 
) VALUES
("20-GH-38", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("20-VL-12", 30, "sensor_1", 45.1241, 1.177, 1591970345),
("20-JH-98", 90, "sensor_1", 45.1241, 1.177, 1591970345);

It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json in our explore view:

Creating the AVRO topic

To create a new topic with format AVRO, we can create a processor that will copy the data from our original topic to a new topic changing the format in the process.

To do this, we start by going to “SQL Processor”, clicking: “New SQL Processor” and defining our processor with the following code:

SET defaults.topic.autocreate=true;

INSERT INTO car_speed_events_avro
STORE 
    KEY AS AVRO 
    VALUE AS AVRO
SELECT STREAM *
FROM car_speed_events_json;

Notice the addition of STORE KEY AS AVRO VALULE AS AVRO. This statement will tell our processor which format we want each facet (key or value) to be stored as.

Hitting “Create New Processor” will start a new processor

We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro into car_speed_events_avro.

We can also verify that the topic format of our new topic is also AVRO for both the key and value facets:

Rekeying data

This page describes a tutorial to rekey data in a Kafka topic with Lenses SQL Processors.

Sometimes you have a topic that is almost exactly what you need, except that the key of the record requires a bit of massaging.

In Lenses SQL you can use the special SELECT ... as _key syntax to quickly re-key your data.

In our example, we have a topic containing events coming from temperature sensors.

Each record contains the sensor’ measured temperature, the time of the measurement, and the id of the sensor. The key is a unique string (for example, a UUID) that the upstream system assigns to the event.

You can replicate the example, by creating a topic in SQL Studio:

CREATE TABLE temperature_events(
    sensor_id string
    , temperature int
    , event_time long
)
FORMAT (string, avro);

We can also insert some example data to do our experiments:

INSERT INTO temperature_events(
    _key
    , sensor_id
    , temperature
    , event_time
) VALUES
("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "sensor-1", 12, 1591970345),
("0958C1E2-804F-4110-B463-7BEA25DA680C", "sensor-2", 13, 1591970346),
("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "sensor-3", 28, 1591970347),
("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "sensor-1", 13, 1591970348),
("A7F19576-6EFA-40A0-91C3-D8999A41F453", "sensor-1", 12, 1591970348),
("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "sensor-2", 31, 1591970349),
("8401CD82-ABFF-4D1B-A564-986DC971F913", "sensor-2", 30, 1591970349),
("F1CF77A0-6CFB-472F-B368-28F011F46C64", "sensor-1", 13, 1591970349);

You can explore the topic in lenses to check the content and the shape of what you just inserted.

Let’s say that what you need is that same stream of events, but the record key should be the sensor_id instead of the UUID.

With the special SELECT ... as _key syntax, a few lines are enough to define our new re-keying processor:

SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor
SELECT STREAM sensor_id as _key
FROM temperature_events;

The query above will take the sensor_id from the value of the record and put it as the new key. All the values fields will remain untouched:

Maybe the sensor_id is not enough, and for some reason, you also need the hour of the measurement in the key. In this case, the key will become a composite object with two fields: the sensor_id and the event_hour:

SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor_and_hour
SELECT STREAM
    temperature_events.sensor_id as _key.sensor_id
    , temperature_events.event_time / 3600 as _key.event_hour
FROM temperature_events;

As you can see, you can build composite objects in Lenses with ease just listing all the structure’s fields, one after the other.

In the last example, the _key output storage format will be inferred automatically by the system as JSON. If you need more control, you can use the STORE AS clause before the SELECT.

The following example will create a topic as the previous one, but where the keys will be stored as AVRO:

SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor_and_hour_avro_key
STORE KEY AS AVRO
SELECT STREAM
    temperature_events.sensor_id as _key.sensor_id
    , FLOOR(temperature_events.event_time / 3600) as _key.event_hour
FROM temperature_events;

Happy re-keying!

Controlling AVRO record names and namespaces

This page a tutorial to control AVRO record names and namespaces with Lenses SQL Processors.

When writing output as AVRO, Lenses creates schemas for you, automatically generating AVRO record names.

In this tutorial we will learn how to override the default record naming strategy.

In Lenses SQL you can use a SET statement to control the record and namespace name generated for the AVRO schema.

Setting up our input topic

We are going to create and populate a topic that we will later use in a couple of SQL Processors.

In SQL Studio, create the topic running the following query:

CREATE TABLE mytopic(a string, b string, c string) FORMAT (STRING, AVRO);

For the purposes of our tutorial, it is enough to insert a single topic:

INSERT INTO mytopic(a, b, c) VALUES ("a", "b", "c");

Create a simple SQL Processor

We are now going to create a processor that will show the default behavior of AVRO record naming in Lenses.

The processor does not do much, it just reshapes the fields of the original topic, putting some of them in a nested field:

SET defaults.topic.autocreate=true;

INSERT INTO mytopic_2
SELECT STREAM a as x.a, b as x.y.b, c as x.y.c
FROM mytopic

We then start the processor. Lenses will create the new topic mytopic_2, and new schema will be created in the Schema Registry, as soon as the first (and only) record is processed.

If we inspect the value schema of mytopic_2, we see that this is the one generated:

{
  "type": "record",
  "name": "record1",
  "fields": [
    {
      "name": "x",
      "type": {
        "type": "record",
        "name": "record0",
        "fields": [
          {
            "name": "a",
            "type": "string"
          },
          {
            "name": "y",
            "type": {
              "type": "record",
              "name": "record",
              "fields": [
                {
                  "name": "b",
                  "type": "string"
                },
                {
                  "name": "c",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    }
  ]
}

As we can see, each record type has a name (it is mandatory in AVRO), and Lenses has generated those names automatically for us (record, record0, record1 etc.).

Set the record name and the namespace of the value schema

We are now going to see how to override that default behavior.

Let’s create and start the new processor with the following SQL:

SET defaults.topic.autocreate=true;
SET defaults.topic.value.avro.record="myRecordName";
SET defaults.topic.value.avro.namespace="myNamespace";

INSERT INTO mytopic_3
SELECT STREAM a as x.a, b as x.y.b, c as x.y.c
FROM mytopic

Notice how we added the new SET statements to the query:

SET defaults.topic.value.avro.record="myRecordName";
SET defaults.topic.value.avro.namespace="myNamespace";

These settings are telling Lenses to set the root record name and namespace to the values specified.

If we now check the value schema for mytopic_3 we get:

{
  "type": "record",
  "name": "myRecordName",
  "namespace": "myNamespace",
  "fields": [
    {
      "name": "x",
      "type": {
        "type": "record",
        "name": "record0",
        "fields": [
          {
            "name": "a",
            "type": "string"
          },
          {
            "name": "y",
            "type": {
              "type": "record",
              "name": "record",
              "fields": [
                {
                  "name": "b",
                  "type": "string"
                },
                {
                  "name": "c",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    }
  ]
}

As we can see, the root record element has now name myRecordName and namespace myNamespace.

Notice how the settings did not affect nested records.

Set the record name and the namespace of the key schema

If the key of the generated topic has AVRO format as well, you can use the following analogous settings to control the key record name and namespace:

SET defaults.topic.key.avro.record="myRecordName";
SET defaults.topic.key.avro.namespace="myNamespace";

More control on the topic affected by the setting

A setting like the one we used before for the value schema:

SET defaults.topic.value.avro.record="myRecordName"

will affect all the topics used by the processor.

If you want instead to target a single topic, you can use the topic-specific version:

SET topic.mytopic_3.value.avro.record="myRecordName"

The setting above will override the record name only for topic mytopic_3. Other topics will not be affected and will keep using the default naming strategy.

Changing the shape of data

This page describe a tutorial to change the shape (fields) of data in a Kafka topic using Lenses SQL Processors.

In this tutorial, we will see how to use Lenses SQL to alter the shape of your records.

In Lenses SQL you can quickly reshape your data with a simple SELECT and some built-in functions.

We will learn how to

  • put value fields into the key

  • lift key fields into the value

  • call functions to transform your data

  • build nested structures for your keys/values

  • unwrap singleton objects into primitive types

Setting up our example

In our example, we are getting data from speed sensors from a speed car circuit.

The upstream system registers speed measurement events as records in a Kafka topic.

An example of such an event is the following:

KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
VALUE: {
  "car_id": "car_2",
  "speedMph": 84,
  "sensor": {
    "id": "sensor_1",
    "lat": 45,
    "long": 0
  },
  "event_time": 159230614
}

We can replicate such a structure running the following query in SQL Studio:

CREATE TABLE car_speed_events(
    car_id string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (string, avro);

Each event is keyed by a unique string generated by the upstream system.

We can again use SQL Studio to insert some data to play with:

INSERT INTO car_speed_events(
    _key
    , car_id
    , speedMph
    , sensor.id
    , sensor.lat
    , sensor.long
    , event_time
) VALUES
("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "car-1", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("0958C1E2-804F-4110-B463-7BEA25DA680C", "car-2", 54, "sensor_2", 45.122, 1.75, 1591970346),
("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "car-3", 60, "sensor_1", 45.1241, 1.177, 1591970347),
("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "car-1", 55, "sensor_2", 45.122, 1.75, 1591970348),
("A7F19576-6EFA-40A0-91C3-D8999A41F453", "car-1", 56, "sensor_1", 45.1241, 1.177, 1591970348),
("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "car-2", 58, "sensor_3", 45.123, 1.176, 1591970349),
("8401CD82-ABFF-4D1B-A564-986DC971F913", "car-2", 60, "sensor_1", 45.1241, 1.177, 1591970349),
("F1CF77A0-6CFB-472F-B368-28F011F46C64", "car-1", 53, "sensor_3", 45.123, 1.176, 1591970349);

Simple projections

In this section, we are only interested in the speed of single cars, and we do not care about all the other fields.

We want to use the car id, which now is part of the record Value, to become the new key (using the special as _key syntax). We also want the car speed as the new record Value.

To achieve that we can create a new SQL Processor using some simple projections.

SET defaults.topic.autocreate=true;

INSERT INTO only_car_speeds
SELECT STREAM 
    car_id AS _key
    , speedMph
FROM car_speed_events;

Checking the records emitted by the processor we see that the shape of the records is

KEY: "car-1"
VALUE: { "speedMph": 50 }

We want to avoid that intermediate wrapping of speedMph inside an object. To do that we can tell Lenses to unwrap the value with the special as _value syntax, saving some bytes and CPU cycles:

SET defaults.topic.autocreate=true;

INSERT INTO only_car_speeds_unwrapped
SELECT STREAM 
    car_id AS _key
    , speedMph AS _value
FROM car_speed_events;

Now the shape of the records is what we had in mind:

KEY: "car-1"
VALUE: 50

Using built-in functions

This time we want to do some more complex manipulation. We want to convert the speed from Mph to Kmph, and we would also want to build a nice string describing the event.

An example of an output record would be:

{
    "speedKmph": 160.93,
    "description": "Car car_1 was running at 100Mph "
}

In this case, we are using CONCATENATE to concatenate multiple strings, CAST to convert an expression to another type, and *, the usual multiplication operator.

SET defaults.topic.autocreate=true;

INSERT INTO car_speeds_kmph
SELECT STREAM
    speedMph * 1.60934 AS speedKmph
    , CONCATENATE(
        'Car ',
        car_id,
        ' was running at ',
        CAST(speedMph AS STRING),
        'Mph') AS description
FROM car_speed_events;

If we check the resulting records, we can see that we obtained the shape we were looking for. Please note that the keys have been left untouched:

KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
VALUE: {
    speedKmph:"80.467",
    description:"Car car-1 was running at 50Mph"
}

Composite objects

In this last example, we will show how to create composite keys and values in our projections.

We want both the sensor id and the event_time as the record Key. For the record Value, we want the car_id and the speed, expressed both as Mph and Kmph.

KEY: {
    "sensor_id": "sensor_1",
    "event_time": 1591970345
}
VALUE: {
    "car_id": "car_1",
    "speed": {
        "mph": 100,
        "kmph": 161
    } 
}

Lenses SQL allows as to use nested aliases to build nested structures. You have to put some dots in your aliases.

SET defaults.topic.autocreate=true;

INSERT INTO car_speds_by_sensor_and_time
SELECT STREAM
    sensor.id AS _key.sensor_id
    , event_time AS _key.event_time
    , car_id
    , speedMph AS speed.mph
    , speedMph * 1.60934 AS speed.kmph
FROM car_speed_events;

The resulting shape of the record is what we were aiming for:

KEY: {
    "sensor_id": "sensor_1",
    "event_time": 1591970345
}
VALUE: {
    "car_id": "car_1",
    "speed": {
        "mph": 100,
        "kmph": 160
    } 
}

Happy re-shaping!

Filtering & Joins

Filtering data

This page describes a tutorial to filter records in a Kafka topic with Lenses SQL Processors.

Filtering messages and copying them to a topic can be achieved using the WHERE clause.

Setting up our example

In our example, we have a topic where our application registers bank transactions.

We have a topic called payments where records have this shape:


KEY: "6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: {
  "amount": "12.10",
  "currency": "EUR",
  "from_account": "xxx",
  "to_account": "yyy", 
  "time": 1591970345000
}

We can replicate such a structure running the following query in SQL Studio:

CREATE TABLE payments(
    amount double
    , currency string
    , from_account string
    , to_account string
    , time datetime
)
FORMAT (string, avro);

Each event has a unique string key generated by the upstream system.

We can again use SQL Studio to insert some data to play wi

INSERT INTO payments(
    _key
    , amount
    , currency 
    , from_account
    , to_account 
    , time
)
VALUES
("6A461C60-02F3-4C01-94FB-092ECBDE0837", 10, "EUR", "account-1", "account-2", 1590970345000),
("E5DA60E8-F622-43B2-8A93-B958E01E8AB3", 100000, "EUR", "account-1", "account-3", 1591070346000),
("0516A309-FB2B-4F6D-A11F-3C06A5D64B68", 5300, "USD", "account-2", "account-3", 1591170347000),
("0871491A-C915-4163-9C4B-35DEA0373B41", 6500, "EUR", "account-3", "account-1", 1591270348000),
("2B557134-9314-4F96-A640-1BF90887D846", 300, "EUR", "account-1", "account-4", 1591370348000),
("F4EDAE35-45B4-4841-BAB7-6644E2BBC844", 3400, "EUR", "account-2", "account-1", 1591470349000),
("F51A912A-96E9-42B1-9AC4-42E923A0A6F8", 7500, "USD", "account-2", "account-3", 1591570352000),
("EC8A08F1-75F0-49C8-AA08-A5E57997D27A", 2500000, "USD", "account-1", "account-3", 1591670356000),
("9DDBACFF-D42B-4042-95AC-DCDD84F0AC32", 1000, "GBP", "account-2", "account-3", 1591870401000)
;

Find big transactions

Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.

We want to copy those transactions into a new topic, maintaining the content of the records as it is.

For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.

Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE statement with a >= condition:

SET defaults.topic.autocreate=true;

INSERT INTO big_payments
SELECT STREAM *
FROM payments
WHERE amount >= 5000

Checking the records emitted by the processor, we see that we got the transactions we were looking for.

Because of the * projection, records content has not changed.

KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, ... }
------------------------------------
KEY:"0516A309-FB2B-4F6D-A11F-3C06A5D64B68"
VALUE: { amount:5300, ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, ... }

Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with ANDs and ORs:

SET defaults.topic.autocreate=true;

INSERT INTO big_eur_usd_payments
SELECT STREAM *
FROM payments
WHERE
    (amount >= 5000 AND currency = 'EUR') OR
    (amount >= 5500 AND currency = 'USD')

As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE statement for that:

SET defaults.topic.autocreate=true;

INSERT INTO big_payments_case
SELECT STREAM *
FROM payments
WHERE
    amount >= (CASE 
        WHEN currency = 'EUR' THEN 5000
        WHEN currency = 'USD' THEN 5500
        WHEN currency = 'GBP' THEN 4500
        ELSE 5000
    END)

getting the results:

KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", ... }
------------------------------------
KEY:"0871491A-C915-4163-9C4B-35DEA0373B41"
VALUE: { amount:6500, currency:"EUR", ... }
------------------------------------
KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8"
VALUE: { amount:7500, currency:"USD", ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", ... }

Filtering by date

In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions.

You will also see how to use a CAST expression to convert from one type to another.

SET defaults.topic.autocreate=true;

INSERT INTO night_payments
SELECT STREAM *
FROM payments
WHERE
    CAST(HOUR(time) AS INT) >= 0 AND
    CAST(HOUR(time) AS INT) <= 5 

Checking the output, we can see that only one transaction satisfied our predicate:

KEY:"6A461C60-02F3-4C01-94FB-092ECBDE0837"
VALUE: { amount:10, currency:"EUR", time:1590970345000, ... }
------------------------------------
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3"
VALUE: { amount:100000, currency:"EUR", time:1591070346000, ... }
------------------------------------
KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A"
VALUE: { amount:2500000, currency:"USD", time:1591670356000, ... }

Sampling with random functions

Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.

We decided then to build a reduced copy of the payments topic, with only 10% of the original records.

To do that we are going to use our RANDINT function:

SET defaults.topic.autocreate=true;

INSERT INTO payments_sample
SELECT STREAM *
FROM payments
WHERE CAST(ABS(RANDINT()) AS DOUBLE) / 2147483647 < 0.01

RANDINT generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.

We have to CAST to double on the way; otherwise, the division would be between integers, and the result would always be 0.

Enriching data streams

This page describes a tutorial to enrich a Kafka topic using Lenses SQL Processors.

In this article, we will be enriching customer call events with their customer details.

Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.

Topics involved:

  • customer_details messages contain information about the customer

  • customer_call_details messages contain information about calls

Streaming SQL enrichment on Apache Kafka

SET defaults.topic.autocreate=true;

INSERT INTO customers_callInfo
SELECT STREAM
    calls._value AS call 
    , customer._value AS customer
FROM  customer_call_details AS calls
        INNER JOIN  (SELECT TABLE * FROM customer_details) AS customer
            ON customer._key.customer.id = calls._key.customer.id

Testing data

To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:

CREATE TOPIC customer_details

CREATE TABLE customer_details(
        _key.customer.typeID string
        , _key.customer.id string
        , customer.name string
        , customer.middleName string null
        , customer.surname string
        , customer.nationality string
        , customer.passportNumber string
        , customer.phoneNumber string
        , customer.email string null
        , customer.address string
        , customer.country string
        , customer.driverLicense string null
        , package.typeID string
        , package.description string
        , active boolean
)
FORMAT(avro, avro)
PROPERTIES(partitions=5, replication=1, compacted=true);

POPULATE TOPIC customer_details

INSERT INTO customer_details(
        _key.customer.typeID
        , _key.customer.id
        , customer.name
        , customer.middleName
        , customer.surname
        , customer.nationality
        , customer.passportNumber
        , customer.phoneNumber
        , customer.email
        , customer.address
        , customer.country
        , customer.driverLicense
        , package.typeID
        , package.description
        , active
) VALUES
("userType1","5162258362252394","April","-","Paschall","GBR","APGBR...","1999153354","aprilP@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("internal","5290441401157247","Charisse","-","Daggett","USA","CDUSA...","6418577217","charisseD@mydomain.com","-","USA","-","TypeC","Desc.",true),
("internal","5397076989446422","Gibson","-","Chunn","USA","GCUSA...","8978860472","gibsonC@mydomain.com","-","USA","-","TypeC","Desc.",true),
("partner","5248189647994492","Hector","-","Swinson","NOR","HSNOR...","8207437436","hectorS@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5196864976665762","Booth","-","Spiess","CAN","BSCAN...","6220504387","hectorS@mydomain.com","-","CAN","-","TypeA","Desc.",true),
("userType2","5423023313257503","Hitendra","-","Sibert","SWZ","HSSWZ...","6731834082","hitendraS@mydomain.com","-","SWZ","-","TypeA","Desc.",true),
("userType2","5337899393425317","Larson","-","Asbell","SWE","LASWE...","2844252229","larsonA@mydomain.com","-","SWE","-","TypeA","Desc.",true),
("partner","5140590381876333","Zechariah","-","Schwarz","GER","ZSGER...","4936431929","ZechariahS@mydomain.com","-","GER","-","TypeB","Desc.",true),
("internal","5524874546065610","Shulamith","-","Earles","FRA","SEFRA...","2119087327","ShulamithE@mydomain.com","-","FRA","-","TypeC","Desc.",true),
("userType1","5204216758311612","Tangwyn","-","Gorden","GBR","TGGBR...","9172511192","TangwynG@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType1","5336077954566768","Miguel","-","Gonzales","ESP","MGESP...","5664871802","MiguelG@mydomain.com","-","ESP","-","TypeA","Desc.",true),
("userType3","5125835811760048","Randie","-","Ritz","NOR","RRNOR...","3245795477","RandieR@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5317812241111538","Michelle","-","Fleur","FRA","MFFRA...","7708177986","MichelleF@mydomain.com","-","FRA","-","TypeA","Desc.",true),
("userType1","5373595752176476","Thurborn","-","Asbell","GBR","TAGBR...","5927996719","ThurbornA@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType3","5589753170506689","Noni","-","Gorden","AUT","NGAUT...","7288041910","NoniG@mydomain.com","-","AUT","-","TypeA","Desc.",true),
("userType2","5588152341005179","Vivian","-","Glowacki","POL","VGPOL...","9001088901","VivianG@mydomain.com","-","POL","-","TypeA","Desc.",true),
("partner","5390713494347532","Elward","-","Frady","USA","EFUSA...","2407143487","ElwardF@mydomain.com","-","USA","-","TypeB","Desc.",true),
("userType1","5322449980897580","Severina","-","Bracken","AUT","SBAUT...","7552231346","SeverinaB@mydomain.com","-","AUT","-","TypeA","Desc.",true);

CREATE TOPIC customer_call_details

CREATE TABLE customer_call_details(
    _key.customer.typeID string
    , _key.customer.id string
    , callInfoCustomerID string
    , callInfoType string
    , callInfoDuration int
    , callInfoInit int)
FORMAT(avro, avro)
PROPERTIES(partitions=1, replication=1, compacted=false)

POPULATE TOPIC customer_call_details

INSERT INTO customer_call_details(
    _key.customer.typeID
    , _key.customer.id
    , callInfoCustomerID
    , callInfoType
    , callInfoDuration
    , callInfoInit
) VALUES
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 470, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 67, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 377, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 209, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 209, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 887, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 203, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 1698, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 320, 1),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 89, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 355, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 65, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 43, 1),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 530, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 270, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1633, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 110, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 540, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 168, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1200, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 1200, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 22, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 333, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 87, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 123, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 182, 1),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 844, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 56, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 36, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 794, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 440, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 52, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 770, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 627, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 555, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 55, 1);

Validate results

SELECT
    p.callInfoCustomerID AS customerID
    , p.callInfoType
    , p.callInfoInit
FROM customer_call_details AS p
        INNER JOIN customer_details AS c
            ON p._key.customer.id = c._key.customer.id

Joining streams of data

This page describes a tutorial joining Kafka topics with Lenses SQL Processors.

magine you are the next Amazon, and you want to track the orders and shipment events to work out which orders have been shipped and how long it took. In this case, there will be two data streams, one for each event type, and the resulting stream will answer the questions above.

Enriching two streams of data requires a sliding window join. The events are said to be “close” to each other, if the difference between their timestamp is up to the time window specified.

Topics involved:

  • orders messages contain information about a customer

  • shipments messages contain information about the shipment

Streaming SQL enrichment on Apache Kafka

The query combines the data from orders and shipments if the orders are processed within 24 hours. Resulting records contain the order and shipment identifier, and the time between the order was registered to the time it was shipped.

SET defaults.topic.autocreate=true;
SET auto.offset.reset = 'earliest';

WITH o as (
 SELECT STREAM *
 FROM orders
 EVENTTIME BY orderTimestamp
);

WITH s as (
 SELECT STREAM *
 FROM shipments
 EVENTTIME BY timestamp
);

INSERT INTO orders_and_shipment
SELECT STREAM
    o._key AS orderId 
    , s._key AS shipmentId
    , DATE_TO_STR(TO_TIMESTAMP(s.timestamp - o.orderTimestamp), 'HH:mm:ss') AS time_difference
FROM  o INNER JOIN s
    ON o._key = s.orderId
WITHIN 24h

Testing data

To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:

CREATE TOPIC orders

CREATE TABLE orders(
        _key string
        , customerId string
        , orderTimestamp long
        , amount decimal(18,2)
)
FORMAT(string, avro)

POPULATE TOPIC orders

INSERT INTO orders(
        _key
        , customerId
        , orderTimestamp
        , amount
) VALUES
("o1","johanne@johanne.io",1596374427000,99.9),   --Sunday, 2 August 2020 13:20:27
("o2","johanne@johanne.io",1596453627000,240.01), --Monday, 3 August 2020 11:20:27
("o3","jack@jack.io",1596280827000,81.81),        --Saturday, 1 August 2020 11:20:27
("o4","anna@anna.io",1596453627000,300.00);       --Monday, 3 August 2020 11:20:27

CREATE TOPIC shipments

CREATE TABLE shipments(
    _key string
    , orderId string
    , timestamp long)
FORMAT(string, avro)

POPULATE TOPIC shipments

INSERT INTO shipments(
    _key
    , orderId
    , `timestamp`
) VALUES
("s1", "o1", 1596445927000),   --Monday, 3 August 2020 09:12:07
("s2", "o3", 1596456112000),   --Monday, 3 August 2020 12:01:52
("s3", "o4", 1596460271000);   --Monday, 3 August 2020 13:11:11

The output seen in the next screenshot shows two records. For the order with o2 identifier, there is no shipments entry because it has not been processed. For the order with identifier o3, the shipment happened after one day.

Content of result topic

Validate results

Let’s switch to the Snapshot engine by navigating to SQL Studio menu item. With the entries in both topics, we can write the following query to see which data is joinable without the window interval:

SELECT 
    o._key AS order 
   , DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
    , s._key AS shipment 
   , DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN  shipments s
    ON o._key = s.orderId

These are the results for the non-streaming query (i.e., Snapshot)

Content of SQL snapshot result topic

Running the query returned three records. But you can see the order o3 was processed two days after it was placed. Let’s apply the sliding window restriction for the Snapshot query by adding a filter to only match those records having their timestamp difference within a day.

SELECT 
    o._key AS order 
    , DATE_TO_STR(o.orderTimestamp, 'yyyy-MM-dd HH:mm:ss') AS order_time
    , s._key AS shipment 
    , DATE_TO_STR(s.timestamp, 'yyyy-MM-dd HH:mm:ss') AS shipment_time
FROM orders o INNER JOIN  shipments s
    ON o._key = s.orderId
WHERE to_timestamp(s.timestamp) - '1d' <= to_timestamp(o.orderTimestamp)

Now the result matches the one from Streaming query.

Conclusion

In this tutorial you learned how to join to Streams together using a sliding window. You achieved all the above using Lenses SQL engine.

Good luck and happy streaming!

Using multiple topics

This page describes a tutorial to use multiple Kafka topics in a Lenses SQL Processor.

In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.

Setting up our example

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will is keyed by user details.

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

    • an age

In light of the above, a record might look like the following (in JSON for simplicity):

{
  "key":{
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value":{
    "points": 5,
    "sessionMetadata": {
      "country": "Italy",
      "language": "IT"
    }
  }
}

Finally, let’s assume we also have another, normalised, compacted topic user-details, keyed by an int matching the pid from topic game-sessions and containing user information like address and period of membership to the platform.

In light of the above, a record might look like the following (in JSON for simplicity):

{
  "key": 1,
  "value":{
    "fullName": "Billy Lagrange",
    "memberYears": 3,
    "address": {
      "country": "Italy",
      "street": "Viale Monza 5",
      "city": "Milan"
    }
  }
}

We can replicate such structures using SQL Studio and the following query:

CREATE TABLE game-sessions(
    _key.pid int
    , _key.name string
    , _key.surname string
    , _key.age int
    , points double
    , sessionMetadata.country string
    , sessionMetadata.language string
)
FORMAT (avro, avro);

CREATE TABLE user-details(
    fullName string
    , memberYears int
    , address.country string
    , address.street string
    , address.city string
) FORMAT (int, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
    _key.pid
    , _key.name
    , _key.surname
    , _key.age
    , points
    , sessionMetadata.country
    , sessionMetadata.language
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
;

INSERT into user-details(
    _key
    , fullName
    , memberYears
    , address.country
    , address.street
    , address.city
) VALUES
(1, 'Billy Lagrange', 3, 'Italy', 'Viale Monza 5', 'Milan'),
(2, 'Maria Rossi', 1, 'Italy', 'Stazione Termini', 'Rome'),
(3, 'Jorge Escudero', 5, 'Spain', '50 Passeig de Gracia', 'Barcelona'),
(4, 'Juan Suarez', 0, 'Mexico', 'Plaza Real', 'Tijuana'),
(5, 'John Bolden', 2, 'USA', '100 Wall Street', 'New Work'),
(6, 'Dave Holden', 1, 'UK', '25 Bishopsgate', 'London'),
(7, 'Nigel Calling', 6, 'UK', '10 Queen Anne Street', 'Brighton')
;

Multiple transformations all in one go

Let’s imagine that, given the above data, we are given the following requirements:

  • For each country in the games-sessions, create a record with the count of games played in from that country. Write the results to the games-per-country topic.

  • For each record in the games-sessions, reshape the records to remove everything from the key beside pid. Additionally, add the user’s memberYears to the value. Write the results to the games-sessions-normalised topic .

We can obtain the above with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

WITH userDetailsTable AS(
  SELECT TABLE *
  FROM user-details
);

WITH joinedAndNormalised AS(
  SELECT STREAM
    gs.*
    , ud.memberYears
  FROM game-sessions AS gs JOIN userDetailsTable AS ud 
        ON (gs._key.pid = ud._key)
);

INSERT INTO games-per-country
SELECT STREAM
  COUNT(*) AS gamesPlayed
FROM game-sessions
GROUP BY sessionMetadata.country;

INSERT INTO games-sessions-normalised
SELECT STREAM *
FROM joinedAndNormalised;

The result of this processor in the UI will be a processor graph similar to the following:

Processor graph for the above query

Finally, the content of the output topics games-per-country and games-sessions-normalised can now be inspected in the Lenses Explore screen:

Content of `games-per-country` topic
Content of `games-sessions-normalised` topic

Conclusion

In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.

Good luck and happy streaming!

Aggregations

Aggregating data in a table

This page describes a tutorial to aggregate Kafka topic data into a table using Lenses SQL Processors.

In this tutorial, we will see how data in a table can be aggregated continuously using GROUP BY and how the aggregated results are emitted downstream.

In Lenses SQL you can read your data as a TABLE and quickly aggregate over it using the GROUP BY clause and SELECT TABLE.

Setting up our example

Let’s assume that we have a topic (game-sessions) containing data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

    • an age

Putting denormalised data in keys is not something that should be done in a production environment.

In light of the above, a record might look like the following (in JSON for simplicity):

{
  "key":{
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value":{
    "points": 5,
    "sessionMetadata": {
      "country": "Italy",
      "language": "IT"
    }
  }
}

We can replicate such structure using SQL Studio and the following query:

CREATE TABLE game-sessions(
    _key.pid int
    , _key.name string
    , _key.surname string
    , _key.age int
    , points double
    , sessionMetadata.country string
    , sessionMetadata.language string)
FORMAT (avro, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
    _key.pid
    , _key.name
    , _key.surname
    , _key.age
    , points
    , sessionMetadata.country
    , sessionMetadata.language
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
;

Count the users that are in a given country

Now we can start processing the data we have inserted above.

Let’s imagine that we are told that we want to keep a running count of how many users are in a given country. To do this, we can assume that a user is currently in the same country where his last game took place.

We can achieve the above with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';  -- this is to speed up the output generation in this tutorial

INSERT INTO groupby-table-country
SELECT TABLE
  COUNT(*) AS gamesPlayed
FROM game-sessions
GROUP BY sessionMetadata.country;

The content of the output topic, groupby-table-country, can now be inspected in the Lenses Explore screen and it will look similar to this:

Content of groupby-table-country topic

The key results to notice here are the ones for Spain and the UK:

  • Spain is 2 because Jorge and Dave had their last game played there.

  • UK is 1 because, while Nigel had his only game played there, Dave initially played from the UK, but then from Italy and finally from Spain. Dave contribution was, therefore, subtracted from the UK count value.

The last point from above is the main difference (and power) of Tables vs. Streams: they represent the latest state of the world for each of their keys, so any aggregation will apply only on that latest data. If this is not clear enough.

Given what a Table is, it will have by definition only a single value for any given key, so doing GROUP BY _key on a Table is a pointless operation because it will always only generate 1-element groups.

Calculate the total and average points of games played in a given language

We can expand on the example from the previous section, imagining that our requirement was extended.

Just as before, we want to calculate statistics based on the current country of a user, as defined in Example 1, but now we want to know all the following:

  • count how many users are in a given country

  • what is the total amount of points these users achieved

  • what is the average amount of points these users achieved

All of the above can be achieved with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

INSERT INTO groupby-table-country-multi
SELECT TABLE
    COUNT(*) AS gamesPlayed
    , SUM(points) as totalpoints
    , AVG(points) as avgpoints
FROM game-sessions
GROUP BY sessionMetadata.country;

The content of the output topic, groupby-table-country-multi, can now be inspected in the Lenses Explore screen and it will look similar to this:

Content of groupby-table-country-multi topic

One thing to highlight here is that the functions we are using in this query (COUNT, SUM, and AVG) all support aggregating over Tables. However, that is not true of all functions. To find out which functions support Tables and which ones only support Streams.

Filtering aggregation data

We will cover one final scenario where we want to filter some data within our aggregation.

There are two possible types of filtering we might want to do when it comes to aggregations:

  • Pre-aggregation: we want some rows to be ignored by the grouping, so they will not be part of the calculation done by aggregation functions. In these scenarios, we will use the WHERE clause.

  • Post-aggregation: we want to filter the aggregation results themselves so that those aggregated records that meet some specified condition are not emitted at all. In these scenarios, we will use the HAVING clause.

Let’s see an example.

We want to calculate the statistics from Example 2, but grouping by the session language. Here we will make again the assumption that a user’s language is represented only by his latest recorded game session.

Additionally, we are only interested in languages used by players who don’t achieve a high total of points (we might want to focus our marketing team’s effort there, to keep them entertained). Finally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations if a given user appeared to have played from a given country.

For the sake of this example, we will:

  • Show statistics for languages with total points lower than 100

  • Ignore sessions that Dave made from Spain (because we know he was not there)

The query for all of the above is:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

INSERT INTO groupby-table-language-filtered
SELECT TABLE
    COUNT(*) AS gamesPlayed
    , SUM(points) as totalpoints
    , AVG(points) as avgpoints
FROM game-sessions
WHERE _key.name != 'Dave' 
    OR sessionMetadata.country != 'Spain'
GROUP BY sessionMetadata.language
HAVING totalpoints < 100;

The content of the output topic, groupby-table-language-filtered, can now be inspected in the Lenses Explore screen and it will look similar to this:

Notice that IT (which is the only language that has 120 points in total) appears in the output but without any data in the value section.

This is because aggregations are Tables, and the key IT used to be present (while it was lower than 100), but then it was removed. Deletion is expressed, in Tables, by setting the value section of a record to null, which is what we are seeing here.

Conclusion

In this tutorial, you learned how to use aggregation over Tables to:

  • group by arbitrary fields, based on the latest state of the world

  • calculate multiple results in a single processor

  • filtering both the data that is to be aggregated and the one that will be emitted as a result of the aggregation itself

Good luck and happy streaming!

Aggregating streams

This page describes a tutorial to aggregate data Kafka topic data into a stream using Lenses SQL Processors

In this tutorial we will see how data in a stream can be aggregated continuously using GROUP BY and how the aggregated results are emitted downstream.

In Lenses SQL you can read your data as a STREAM and quickly aggregate over it using the GROUP BY clause and SELECT STREAM

Setting up our example

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

    • an age

Keep in mind this is just an example in the context of this tutorial. Putting denormalised data in keys is not something that should be done in a production environment.

In light of the above, a record might look like the following (in json for simplicity):

{
  "key": {
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value": {
    "points": 5,
    "sessionMetadata": {
      "country": "Italy",
      "language": "IT"
    }
  }
}

We can replicate such structure using SQL Studio and the following query:

CREATE TABLE game-sessions(
    _key.pid int
    , _key.name string
    , _key.surname string
    , _key.age int
    , points double
    , sessionMetadata.country string
    , sessionMetadata.language string)
FORMAT (avro, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
    _key.pid
    , _key.name
    , _key.surname
    , _key.age
    , points
    , sessionMetadata.country
    , sessionMetadata.language
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
;

Count how many games each user played

Now we can start processing the data we have inserted above.

One requirement could be to count how many games each user has played. Additionally, we want to ensure that, should new data come in, it will update the calculations and return the up to date numbers.

We can achieve the above with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';  -- this is just to speed up the output generation in this tutorial

INSERT INTO groupby-key
SELECT STREAM
  COUNT(*) AS gamesPlayed
FROM game-sessions
GROUP BY _key;

The content of the output topic, groupby-key, can now be inspected in the Lenses Explore screen and it will look similar to this:

Content of `groupby-key` topic

As you can see, the keys of the records did not change, but their value is the result of the specified aggregation.

You might have noticed that groupby-key has been created as a compacted topic, and that is by design.

All aggregations result in a Table because they maintain a running, fault-tolerant, state of the aggregation and when the result of an aggregation is written to a topic, then the topic will need to reflect these semantics (which is what a compacted topic does).

Add each user’s best results, and the average over all games

We can expand on the example from the previous section. We now want to know, for each user, the following:

  • count how many games the user has played

  • what are the user’s best 3 results

  • what is the user’s average of points

All the above can be achieved with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

INSERT INTO groupby-key-multi-aggs
SELECT STREAM
    COUNT(*) AS gamesPlayed
    , MAXK(points,3) as maxpoints
    , AVG(points) as avgpoints
FROM game-sessions
GROUP BY _key;

The content of the output topic, groupby-key-multi-aggs, can now be inspected in the Lenses Explore screen, and it will look similar to this:

Content of `groupby-key` topic

Gather statistics about users playing from the same country and using the same language

Our analytics skills are so good that we are now asked for more. We now want to calculate the same statistics as before, but grouping together players that played from the same country and used the same language.

Here is the query for that:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

INSERT INTO groupby-country-and-language
SELECT STREAM
    COUNT(*) AS gamesPlayed
    , MAXK(points,3) as maxpoints
    , AVG(points) as avgpoints
    , sessionMetadata.language as sessionLanguage
FROM game-sessions
GROUP BY
    sessionMetadata.country
    , sessionMetadata.language;

The content of the output topic, groupby-country-and-language, can now be inspected in the Lenses Explore screen and it will look similar to this:

Content of `groupby-country-and-language` topic

Notice how we projected sessionMetadata.language as sessionLanguage in the query. We could do that because sessionMetadata.language is part of the GROUP BY clause. Lenses SQL only supportsas Full Group By mode, so if the projected field is not part of the GROUP BY clause, the query will be invalid.

Filtering aggregation data

One final scenario we will cover in this tutorial is when we want to filter some data within our aggregation.

There are two possible types of filtering we might want to do, when it comes to aggregations:

  • Pre-aggregation: we want some rows to be ignored by the grouping, so they will not be part of the calculation done by aggregation functions. In these scenarios we will use the WHERE clause.

  • Post-aggregation: we want to filter the aggregation results themselves, so that those aggregated records which meet some specified condition are not emitted at all. In these scenarios we will use the HAVING clause.

Let’s see an example.

We want calculate the usual statistics from the previous scenarios, but grouping by the session language only. However, we are interested only in languages that are used a small amount of times (we might want to focus our marketing team’s effort there); additionally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations, if a given user appeared to have played from a given country.

For the sake of this example, we will:

  • Show statistics for languages that are used less than 9 times

  • Ignore sessions that Dave made from Spain (because we know he was not there)

The query for all the above is:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

INSERT INTO groupby-language-filtered
SELECT STREAM
    COUNT(*) AS gamesPlayed
    , MAXK(points,3) as maxpoints
    , AVG(points) as avgpoints
FROM game-sessions
WHERE _key.name != 'Dave'
    OR sessionMetadata.country != 'Spain'
GROUP BY sessionMetadata.language
HAVING gamesPlayed < 9;

The content of the output topic, groupby-language-filtered, can now be inspected in the Lenses Explore screen and it will look similar to this:

Notice that IT (which is the only language that has 9 sessions in total) appears in the output but without any data in the value section.

This is because aggregations are Tables, and the key IT used to be present (while it was lower than 9), but then it was removed. Deletion is expressed, in Tables, by setting the value section of a record to null, which is what we are seeing here.

Conclusion

In this tutorial you learned how to use aggregation over Streams to:

  • group by the current key of a record

  • calculate multiple results in a single processor

  • group by a combination of different fields of the input record

  • filtering both the data that is to be aggregated, and the one that will be emitted by the aggregation itself

You achieved all the above using Lenses SQL engine.

You can now proceed to learn about more complex scenarios like aggregation over Tables and windowed aggregations.

Good luck and happy streaming!

Time window aggregations

This page describes a tutorial to perform time windowed aggregations on Kafka topic data with Lenses SQL Processors.

In this tutorial we will see how data in a Stream can be aggregated continuously using GROUP BY over a time window and the results are emitted downstream.

In Lenses SQL you can read your data as a STREAM and quickly aggregate over it using the GROUP BY clause and SELECT STREAM

Setting up our example

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The startAt the date and time the game commenced

    • The endedAt the date and time the game finished

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

    • an age

Keep in mind This is just an example in the context of this tutorial. Putting denormalised data in keys is not something that should be done in a production environment.

In light of the above, a record might look like the following (in json for simplicity):

{
  "key": {
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value": {
    "points": 5,
    "country": "Italy",
    "language": "IT",
    "startedAt": 1595435228,
    "endedAt": 1595441828
  }
}

We can replicate such structure using SQL Studio and the following query:

CREATE TABLE game-sessions(
  _key.pid int,
  _key.name string,
  _key.surname string,
  _key.age int,
  points double,
  country string,
  startedAt long,
  endedAt long)
FORMAT (avro, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
  _key.pid,
  _key.name,
  _key.surname,
  _key.age,
  points,
  country,
  startedAt,
  endedAt
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 1595524080000, 1595524085000),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 1595524086000, 1595524089000),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 1595524091000, 1595524098000),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 1595524080000, 1595524085000),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 1595524086000, 1595524089000),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 1595524086000, 1595524089000),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 1595524080000, 1595524085000),
(5, 'John', 'Bolden', 40, 10, 'USA', 1595524080000, 1595524085000);

The time a game started and completed is expressed in epoch time. To see the human readable values, run this query:

SELECT
  startedAt
  , DATE_TO_STR(startedAt, 'yyyy-MM-dd HH:mm:ss') as started
  , endedAt
  , DATE_TO_STR(endedAt 'yyyy-MM-dd HH:mm:ss') as ended
FROM game-sessions;
Content of game-sessions displaying the human readable timestamps

Count how many games were played per user every 10 seconds

Now we can start processing the data we have inserted above.

One requirement could be to count how many games each user has played every 10 seconds.

We can achieve the above with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='2000';  -- this is just to speed up the output generation in this tutorial

INSERT INTO games_per_user_every_10_seconds
SELECT STREAM
    COUNT(*) as occurrences
    , MAXK_UNIQUE(points,3) as maxpoints
    , AVG(points) as avgpoints
FROM game-sessions
EVENTTIME BY startedAt
WINDOW BY TUMBLE 10s
GROUP BY _key

The content of the output topic, games_per_user_every_10_seconds, can now be inspected and eventually it will look similar to this:

Content of groupby-key topic

As you can see, the keys of the records did not change, but their value is the result of the specified aggregation. The gamer Billy Lagrange has two entries because he played 2 games, the first two with a start window between 2020-07-23 17:08:00 and 2020-07-23 17:08:10(exclusive), and the third entry between 2020-07-23 17:08:10 (inclusive) and 2020-07-23 17:08:20(exclusive).

You might have noticed that groupby-key has been created as a compacted topic, and that is by design.

All aggregations result in a Table because they maintain a running, fault-tolerant, state of the aggregation and when the result of an aggregation is written to a topic, then the topic will need to reflect these semantics (which is what a compacted topic does).

Count how many games were played per country every 10 seconds

We can expand on the example from the previous section. We now want to know, for each country on a 10 seconds interval, the following:

  • count how many games were played

  • what are the top best 3 results

All the above can be achieved with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='2000';  -- this is just to speed up the output generation in this tutorial

INSERT INTO games_per_country_every_10_seconds
SELECT STREAM
    COUNT(*) as occurrences
    , MAXK_UNIQUE(points,3) as maxpoints
    , country
FROM game-sessions
EVENTTIME BY startedAt
WINDOW BY TUMBLE 10s
GROUP BY country

The content of the output topic, games_per_country_every_10_seconds, can now be inspected in the SQL Studio screen by running:

SELECT *
FROM games_per_country_every_10_seconds

There are 2 entries for Italy, since there is one game played at 2020-07-23 18:08:11. Also, notice for the other entry on Italy, there are 4 occurrences and 3 max points. The reason for 4 occurrence is down to 4 games, two each from Billy Lagrange and Maria Rossi within the 10 seconds time window between 2020-07-23 18:08:00 and 2020-07-23 18:08:10(exclusive).

Content of games_per_country_every_10_seconds topic

Conclusion

In this tutorial you learned how to use aggregation over Streams to:

  • group by the current key of a record

  • group by a field in the input record

  • use a time window to define the aggregation over.

Good luck and happy streaming!

Complex types

Unwrapping complex types

This page describes a tutorial to unwrap a complex data type in a Kafka topic using Lenses SQL Processors.

In this example, we will show how Lenses can be used to transform complex data types into simple primitive ones.

Setting up

We start this tutorial by creating a topic which will hold information regarding visits to our website:

CREATE TABLE lenses_monitoring(
   _key.landing_page string
  , _key.user string
  , time_spent_s int
)
FORMAT(avro, avro);

Firstly we’ll add some data to our newly created topic:

INSERT INTO lenses_monitoring(
    _key.landing_page
    , _key.user
    , time_spent_s
) VALUES
("homepage", "anon_21", 30),
("why-lenses", "anon_32", 45),
("use-cases", "anon_56", 12),
("customers", "anon_36", 12),
("use-cases", "anon_126", 12);          

Unwrapping the data

For example, let’s say we’re interested in sending this data to a service that analyses the time spent on a page and how it changes over time.

This system has a caveat though it only accepts data where keys are specified as strings and values are specified as integers.

Rather than having to reimplement our analysis system, we can create a SQL Processor that will continuously send data to a new topic in a format the target system can work with:

SET defaults.topic.autocreate=true;
INSERT INTO analysis_topic 
SELECT STREAM 
    _key.landing_page as _key
    , time_spent_s as _value
FROM lenses_monitoring;

Notice the addition of the as _key and as _value aliases; these tell lenses to “unwrap” the values; effectively making lenses write them as primitive types (string and integer respectively) instead of (in this particular case) Avro objects.

Lenses will also automatically infer the format of each topic facet, in this case it set them to STRING and INT respectively.

--

Working with Arrays

This page describes a tutorial on how to work with array data in your Kafka topics using Lenses SQL Processors.

In this tutorial, we will see how to use Lenses SQL to extract, manipulate and inspect the single elements of an array.

In Lenses SQL you can use a LATERAL JOIN to treat the elements of an arrays as a normal field.

You will learn to:

  • Extract the single elements of an array with a single LATERAL join.

  • Extract elements of a multi-level array with nested LATERAL joins.

  • Use arbitrary complex array expressions as the right hand side of a LATERAL join.

Lateral Join on a simple array

In this example, we are getting data from a sensor.

Unfortunately, the upstream system register the readings in batches, while what we need is a single record for every reading.

An example of such a record is the following:

KEY: 1
VALUE: {
  "meter_id": 1,
  "readings": [100, 101, 102]
}

Notice how each record contains multiple readings, inside the reading array field.

We can replicate such a structure running the following query in SQL Studio:

CREATE TABLE batched_readings(
  meter_id int,
  readings int[]
) FORMAT(int, AVRO);

We can again use SQL Studio to insert some data to play with:

INSERT INTO batched_readings(_key, meter_id, readings) VALUES
(1, 1, [100, 92, 93, 101]),
(2, 2, [81, 82, 81]),
(3, 1, [95, 94, 93, 96]),
(4, 2, [80, 82])

What we want to obtain is a new topic readings, where each record contain a single reading, together with its meter_id. Considering the first record, we expect to explode it to four different records, one for each reading:

KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 92 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 93 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }

In LensesSQL you can easily achieve that with the special LATERAL syntax .

You can create a processor defined as:

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  batched_readings
  LATERAL readings as reading

The magic happens in batched_readings LATERAL readings as reading. With that we are basically saying:

  • for each record in batched_readings

  • for each element inside the readings array of that record

  • build a new record with all the fields of the original batched_readings record, plus an extra reading field, that will contain the value of the current element of readings.

We can then use in the SELECT both the original fields and the new reading field.

If you save the processor and run it, you will see that it will emit the records we expected.

Filtering

One of the powerful features of a LATERAL join is that the expression you put in the LATERAL can be used as a normal field. This means that you can then use it for example also in a WHERE or in a GROUP BY.

In this section we will see how to filter records generated by a LATERAL using a normal WHERE.

We want to modify our previous processor in order to emit only the readings greater than 95.

To do that is enough to use reading as if it were a normal field, in the WHERE section:

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  batched_readings
  LATERAL readings as reading
WHERE
  reading > 95

Running the processor we get the records

KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }
-----------------------------
KEY: 3
VALUE: { "meter_id": 1, "reading": 96 }

Lateral Join on a multi-level array

This example is similar to the previous one. The only difference is that the readings are now stored in batches of batches:

CREATE TABLE batched_readings_nested(
  meter_id int,
  nested_readings int[][]
)
FORMAT(int, AVRO);

As you can see, nested_readings is an array whose elements are array of integers.

We can again use SQL Studio to insert some data:

INSERT INTO batched_readings_nested(_key, meter_id, nested_readings) VALUES
(1, 1, [[100, 92], [93, 101]]),
(2, 2, [[81], [82, 81]]),
(3, 1, [[95, 94, 93], [96]]),
(4, 2, [[80, 82]])

We would like to define a processor that emits the same records of the previous one.

In this case though we are dealing with nested_readings, that is a multi-level array, so a single LATERAL join is not enough. But nesting a LATERAL inside another will do the jo

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  batched_readings_nested
  LATERAL nested_readings as readings
  LATERAL readings as reading

This is roughly what happens in the FROM clause:

  • We first unwrap the first level of the array, doing a batched_readings_nested LATERAL nested_readings as readings.

  • At that point, readings will be an array of integers.

  • We can then use it in the outer ... LATERAL readings as reading join, and the single integers will finally be extracted and made available as reading.

Complex array expressions

In this section we will see how it is possible to use any expression as the right hand side of a LATERAL join, as long as it gets evaluated to an array.

We have a table where the meter readings are split into two columns, readings_day and readings_night.

CREATE TABLE day_night_readings(
  meter_id int,
  readings_day int[],
  readings_night int[]
)
FORMAT(int, AVRO);

Let’s insert the same data as the first example, but where the readings are split across the two columns.

INSERT INTO day_night_readings(_key, meter_id, readings_day, readings_night) VALUES
(1, 1, [100, 92], [93, 101]),
(2, 2, [81], [82, 81]),
(3, 1, [95, 94, 93], [96]),
(4, 2, [80], [81])

To extract the readings one by one, we need first to concatenate the two arrays readings_day and readings_night. We can achieve that using flatten. We can then use the concatenated array in a LATERAL join:

SET defaults.topic.autocreate=true;

INSERT INTO
  readings
SELECT STREAM
  meter_id,
  reading
FROM
  day_night_readings
  LATERAL flatten([readings_day, readings_night]) as reading

The processor defined above will emit the records

KEY: 1
VALUE: { "meter_id": 1, "reading": 100 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 92 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 93 }
-----------------------------
KEY: 1
VALUE: { "meter_id": 1, "reading": 101 }
-----------------------------
KEY: 2
VALUE: { "meter_id": 2, "reading": 81 }
...

as we expected.

Controlling event time

This describes how to control event time for data in your Kafka topics with Lenses SQL Processors.

Every message in Kafka comes with a timestamp, and Lenses Engine Streaming mode uses that by default when doing time-dependent operations, like aggregations and joins.

Sometimes though that timestamp is not exactly what you need, and you would like to use a field in the record value or key as the new timestamp.

In Lenses SQL you can use the special EVENTTIME BY ... syntax to control records timestamp.

Setting up our example

In our toy example, we have a simple topic where electricity meter readings events are collected:

CREATE TABLE electricity_events(
    KW double
    , customer_id int
    , event_time long
)
FORMAT (string, avro);

We can also insert some example data to do our experiments:

INSERT INTO electricity_events(
    KW
    , customer_id
    , event_time
) VALUES
(1.0, 1, 1592848400000),
(2.0, 2, 1592848400000),
(1.5, 3, 1592848400000),
(2.0, 1, 1592848405000),
(1.0, 2, 1592848405000),
(2.5, 3, 1592848405000),
(3.0, 1, 1592848410000),
(0.5, 2, 1592848410000),
(4.0, 3, 1592848405000)
;

If you query the events, you can see that Kafka sets a timestamp for each record. That timestamp is, in our case, the time of when the record was inserted. As you can see, it is totally unrelated to the event_time field we have in the payload.

[
{"value":{"KW":1,"customer_id":1,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840812,...}},
{"value":{"KW":2,"customer_id":2,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840821,...}},
{"value":{"KW":1.5,"customer_id":3,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840828,...}},
{"value":{"KW":2,"customer_id":1,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840834,...}},
{"value":{"KW":1,"customer_id":2,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840842,...}},
{"value":{"KW":2.5,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840848,...}},
{"value":{"KW":3,"customer_id":1,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840855,...}},
{"value":{"KW":0.5,"customer_id":2,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840862,...}},
{"value":{"KW":4,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840868,...}}
]

Computing moving averages

We would like to transform our original stream of events, aggregating events with a hopping window of 10s width and an increment of 5s, computing the average for each window.

You can create a new processor that streams those averages, using the special WINDOW BY ... syntax:

SET defaults.topic.autocreate=true;

INSERT INTO electricity_events_avg_wrong
SELECT STREAM 
    customer_id
    , AVG(KW) as KW
FROM electricity_events
WINDOW BY HOP 10s,5s
GROUP BY customer_id

For customer 1, we have three events in input, with a 5s delay between them, so we expect four output events for that customer, since 4 is the number of hopping windows involved.

ButChecking the emitted records we see that only two are produced.

This is because by default windowing operations works on the record timestamp, and in our case all the timestamps are pretty much the same, and they coincide with the time the records were inserted.

Fortunately e can change this behavior using the special EVENTTIME BY ... syntax, specifying an expression to be used as a timestamp:

SET defaults.topic.autocreate=true;

INSERT INTO electricity_events_avg
SELECT STREAM 
    customer_id
    , AVG(KW) as KW
FROM electricity_events
EVENTTIME BY event_time
WINDOW BY HOP 10s,5s
GROUP BY customer_id

As you can see, the results have been windowed using event_time as the timestamp:

[
{"key":{"value":1,"window":{"start":1592848395000,"end":null}},"value":{"customer_id":1,"KW":1}, ...},
{"key":{"value":1,"window":{"start":1592848400000,"end":null}},"value":{"customer_id":1,"KW":1.5}, ...},
{"key":{"value":1,"window":{"start":1592848405000,"end":null}},"value":{"customer_id":1,"KW":2.5}, ...},
{"key":{"value":1,"window":{"start":1592848410000,"end":null}},"value":{"customer_id":1,"KW":3}, ...}
]

SQL Studio

Querying data

This page describes a tutorial for querying data with SQL Studio.

When querying Kafka topic data with SQL such as

SELECT *
FROM topicA
WHERE transaction_id=123

a full scan will be executed, and the query processes the entire data on that topic to identify all records that match the transaction ID.

If the Kafka topic contains a billion 50KB messages - that would require querying 50 GB of data. Depending on your network capabilities, brokers’ performance, any quotas on your account, and other parameters, fetching 50 GB of data could take some time! Even more, if the data is compressed. In the last case, the client has to decompress it before parsing the raw bytes to translate into a structure to which the query can be applied.

Does Apache Kafka have indexing capabilities?

No. Apache Kafka does not have the full indexing capabilities in the payload (indexes typically come at a high cost even on an RDBMS / DB or a system like Elastic Search), however, Kafka indexes the metadata.

Can we push down predicates in Apache Kafka?

The only filters Kafka is supporting are topic, partition and offsets or timestamp.

Push down queries using the Apache Kafka metadata

Partitions

SELECT *
FROM topicA
WHERE transaction_id=123
   AND _meta.partition = 1

If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.

SELECT *
FROM topicA
WHERE transaction_id=123
  AND _meta.offset > 100
  AND _meta.offset < 100100
  AND _meta.partition = 1

If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.

Timestamp

SELECT *
FROM topicA
WHERE transaction_id=123
  AND _meta.timestamp > NOW() - "1H"

The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.

Time-traveling

SELECT *
FROM position_reports
WHERE
   _meta.timestamp > "2024-04-01" AND
   _meta.timestamp < "2026-04-02"

The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.

you could also filter on time with :

SELECT * from mytopic
WHERE 
    _meta.timestamp > '2024-10-16 14:30:00' AND 
    _meta.timestamp < "2025-04-02 14:30:00"

How can I have 100s of queries without impacting my cluster?

Lenses provides a set of rules for

  • termination control

  • resource protection

  • query management

Termination Control

SELECT * FROM topicA WHERE _key.deviceId=123 LIMIT 10

Adding a LIMIT 10 in the SQL query will result in the SQL terminating early, as soon as 10 x messages have been discovered. It’s not a perfect solution as we might never find 10 x messages, and thus perform a full scan.

Add LIMIT to your query to have quick and efficient query completion

SET max.query.time = 30s;

One can control the maximum time a SQL query will run for. The admin can set up a default value, and a user can override it.

SET max.size = 1M;

Complete doc on execution can be found here

One can control the maximum bytes the SQL query will fetch. The admin can set up a default value, but a more advanced user can override it.

SET max.idle.time = 5s;

The above will make sure the query terminates after 5 seconds of reaching the end of the topic. The admin can set up a default value. The idea is that there is no reason to keep polling if we have exhausted the entire topic.

Resource Protection

The complete set of SQL Queries on Apache Kafka are currently being executed under a specific client-id lenses.sql.engine and an admin can apply a global Kafka quota to restrict the maximum total network I/O.

By adding a Quota on your Kafka cluster under the lenses.sql.engine CLIENT name, you can also control the global network I/O that is allocated to all users querying Kafka data with SQL.

Query management

An admin can view all active queries with:

SHOW QUERIES

and control them, i.e., stop a running query with the following statement

KILL QUERY <id>

Accessing headers

This page describes a tutorial for access headers via SQL Studio.

Since version 0.11, Apache Kafka supports message headers. Headers are a typical concept in messaging systems like JMS and transport systems like TCP or HTTP.

They can be used for routing, filtering, and annotation. The header can contain pairs of key=value, where the key is always a string(text), and the value is an array of bytes.

Given that the actual value is an array of bytes, the developer who produces the messages knows whether this array of bytes represents a string, an int, a long, or a double. Lenses can visualize and use the record headers via SQL queries.

By default, Lenses fetches the headers of messages and displays them on the UI.

To view the value of a specific header you can run:

SELECT HEADERASSTRING("User") as user
FROM trips
LIMIT 100

Deleting data from compacted topics

This page describes a tutorial for deleting data in compacted topics from SQL Studio.

In this example, we will show how we can use Lenses to delete records from a compacted topic which stores users' calling information based on a different topic that stores the users’ response to a “do you want to be contacted back” survey.

Create compacted topic

With the following code, we can create a compacted topic that holds user call information:

CREATE TABLE user_calls (
    _key.user_id string
  , user.name string
  , user.phone_number string
  , user.address.door string
  , user.address.street string
) 
FORMAT(AVRO,AVRO)
PROPERTIES(
    cleanup.policy=compact
    , min.cleanable.dirty.ratio=0.01
    , segment.ms=100
    , retention.ms=100
    , segment.bytes=400
);

Notice we add cleanup.policy=compact to tell Lenses we want the topic to be compacted. The remaining properties try to force compaction to happen often so that we can easily visualize the result (this should not be used in production though).

We start by adding some records to our user_info topic:

INSERT INTO user_calls(
    _key.user_id
    , user.name 
    , user.phone_number 
    , user.address.door 
    , user.address.street
    , call.duration
    , call.satisfaction
) VALUES 
("user_1", "John Smith","202-555-0195", "002", "Pratt Avenue", 60, "10"),
("user_2", "Mark Richards","202-245-2345", "765", "East Avenue", 15,"7"),
("user_3", "Timothy Hamilton ","202-333-0195", "002", "Rock Avenue", 12,"5"),
("user_4", "Mark Hamilton ","202-333-0195", "002", "Rock Avenue", 12,"5"),
("user_5", "John Richards ","202-333-0123", "002", "Rock Avenue", 12,"5"),
("user_6", "Timothy Hamilton ","202-333-1295", "002", "Rock Avenue", 12,"5"),
("user_7", "J. Hamilton ","202-444-1195", "002", "Rock Avenue", 12,"5"),
("user_8", "Mark Richards","202-245-2345", "765", "East Avenue", 15,"7"),
("user_9", "Mark Arnold","202-245-2345", "765", "East Avenue", 15,"7")
;

Which we can see by inspecting the topic.

We additionally add a second topic which will hold information regarding users' response to the survey:

CREATE TABLE contact_survey(
    _key.request_id string
    , user_id string
    , request_date string
    , response boolean
) 
FORMAT(AVRO,AVRO)

As previously, we added some records to contact_survey

INSERT INTO contact_survey(
    _key.request_id
    , user_id
    , request_date
    , response
) VALUES
("survey01", "user_1", "2020-06-01", false),
("survey02", "user_2", "2020-06-01", true),
("survey03", "user_3", "2020-06-01", true),
("survey04", "user_4", "2020-06-01", true),
("survey05", "user_5", "2020-06-01", false),
("survey06", "user_6", "2020-06-01", false),
("survey07", "user_7", "2020-06-01", false),
("survey08", "user_8", "2020-06-01", false),
("survey09", "user_9", "2020-06-01", false)
;

Deleting data using Processors

Using the following:

INSERT INTO user_calls
SELECT STREAM 
    user_id as _key.user_id 
    , null as _value
FROM contact_survey
WHERE response = false;

We are essentially issuing a delete command for all users who said they didn’t want to be contacted.

Looking at our user_calls topic again, we can see the newly inserted records with a null value, but our original records are still there… How so?

Due to some internals of Kafka, log compaction doesn’t always work immediately but in this case, by adding an extra record we can force it to happen:

INSERT INTO user_calls(
    _key.user_id
    , user.name 
    , user.phone_number 
    , user.address.door 
    , user.address.street
    , call.duration
    , call.satisfaction
) VALUES 
("user_10", "John Smith","202-555-0195", "002", "Pratt Avenue", 60, "10")

Looking at the data inside our topic, we can now see that users who responded that they didn’t want to be contacted are no longer part of our topic; the tombstone records (the ones with a null value) will stay around for as long as our retention policy specifies and will eventually be removed, leaving us with a topic with only users that want to be contacted.

Working with JSON

This page describes a tutorial on how to work with JSON in Lenses SQL.

In this tutorial, we will see how to use Lenses SQL to process JSON strings using JsonPath, a sort of XPath for JSON objects.

In Lenses SQL you can use a JSON_EXTRACT_FIRST() and JSON_EXTRACT_ALL() to navigate and transform JSON strings.

Setting up our example

We have a topic http_logs that collects the details of HTTP calls to a microservice. Basic details of the request are stored, like the URL and the HTTP method used. The payload of the requests is stored as well as a string.

We can create the topic and insert some example data through SQL Studio:

CREATE TABLE http_logs(_key string, method string, url string, content_type string, body string)
FORMAT (STRING, AVRO);

INSERT INTO http_logs(_key, method, url, content_type, body) VALUES
('event_1', 'POST', '/users', 'application/json', '{ "id": 1, "username": "juno", "contacts": [ { "type": "email", "value": "juno@example.org" }, { "type": "phone", "value": "12345" }] }'),
('event_2', 'POST', '/users', 'application/json', '{ "id": 2, "username": "max", "contacts": [ { "type": "email", "value": "max@example.org" }, { "type": "twitter", "value": "@max" }] }'),
('event_3', 'GET', '/users/1', '', ''),
('event_4', 'GET', '/users/2', '', ''),
('event_5', 'POST', '/users', 'application/json', '{ "id": 3, "username": "nic", "contacts": [ { "type": "email", "value": "nic@example.org" }, { "type": "phone", "value": "78910" }] }'),
('event_6', 'PUT', '/users/1', 'application/json', '{ "username": "juno", "contacts": [ { "type": "email", "value": "juno@domain.org" }] }'),
('event_7', 'POST', '/users', 'application/json', '{ "id": 4, "username": "john", "contacts": [ { "type": "email", "value": "john@example.org" }] }');

The HTTP method and the URL used for the request are stored in the method and url fields respectively, while the optional payload, and its content-type, are stored in the body and content_type fields.

As you can imagine the logs contained in this topic are quite generic, and different endpoints may have different content-types for their body. For this reason the best the system can do is storing the payload as a simple string, whenever that is possible.

This comes with some drawbacks: since the data is a simple string, and it is not structured, it is not possible to inspect it as we would do with a normal AVRO/JSON object.

Fortunately Lenses SQL offers a couple of handful functions that make our life easier in these kind of scenarios.

Extracting fields from a JSON string

Our first task is to find the username of users created with a call to POST /users.

To do that we can use JSON_EXTRACT_FIRST(json_string, pattern), one of the string functions available in Lenses SQL. The first argument of the function is the string representing the JSON we want to manipulate. The second is a string representing a JsonPath.

JsonPath is a powerful way to traverse and extract elements from a JSON object. Explaining the full details of goes beyond the scope of this article, but in general it can be thought as a JSON version of XPath, the standard used to select elements from an XML document.

A nice way to try and test if your JsonPaths are doing what you intended, is using the JsonPath online evaluator.

In our case, we would like to extract the name of the user just created. The simple path $.username will do it!

Let’s try to use it in a SELECT that we can run in SQL Studio:

SELECT
    JSON_EXTRACT_FIRST(body, '$.username') as username
FROM
    http_logs

That query will produce the results

{ "username": "\"juno\"" }
{ "username": "\"max\"" }
{ "username": null }
{ "username": null }
{ "username": "\"nic\"" }
{ "username": "\"juno\"" }
{ "username": "\"john\"" }

As you can see we have two entries for juno. That’s because the user was first created, and then modified later, with a PUT call.

Also, there are some null values. This is because JSON_EXTRACT_FIRST was not able to extract the username, either because the payload was not valid JSON, or because the field was not found.

We can fix this restricting our query to user creation calls:

SELECT
   JSON_EXTRACT_FIRST(body, '$.username') as username
FROM
   http_logs
WHERE
   method = "POST" AND
   url = "/users"

We have now only valid results:

{ "username": "\"juno\"" }
{ "username": "\"max\"" }
{ "username": "\"nic\"" }
{ "username": "\"john\"" }

Filtering the results

All Lenses SQL functions can be used in any part of the query. Thus JSON_EXTRACT_FIRST can be used in the projections, where, and group bys.

For example, you can run the query

SELECT
   JSON_EXTRACT_FIRST(body, '$.contacts[?(@.type=="email")].value') as email
FROM
   http_logs
WHERE
   JSON_EXTRACT_FIRST(body, '$.username') = '"max"'

to retrieve max’s e-mail:

{ "email" : "max@example.org" }

Extract multiple values from a JSON string

So far we had fun using JSON_EXTRACT_FIRST, but we have not talked yet about its bigger brother, JSON_EXTRACT_ALL.

JSON_EXTRACT_ALL(json_string, pattern) works like JSON_EXTRACT_FIRST, except that it will return all the values that match the pattern. The results will be returned in an array, and when no results are found the empty array will be returned.

Let’s make use of it, extracting all the contact types used at the moment of the creation of the user:

SELECT
   JSON_EXTRACT_FIRST(body, '$.username') as username,
   JSON_EXTRACT_ALL(body, '$.contacts[*].type') as contact_types
FROM
   http_logs
WHERE
   method = "POST" AND
   url = "/users"

Running the query above we get what we desired:

{ "username": "\"juno\"", "contact_types": [ "email", "\"phone\"" ] }'),
{ "username": "\"max\"", "contact_types": [ "email", "\"twitter\"" ] }'),
{ "username": "\"nic\"", "contact_types": [ "email", "\"phone\"" ] }'),
{ "username": "\"john\"", "contact_types": [ "\"email\""] }')

Sql Processors

JSON_EXTRACT_FIRST() and JSON_EXTRACT_ALL() are available also in the Streaming Engine, like most Lenses SQL functions.

Let’s say we want another topic continuously filled with the contact types used for user creations. We also want each record containing a single username-contact type pair. To achieve that we can take the query of the last example and adapt it a bit, using a lateral join.

SET defaults.topic.autocreate=true;

INSERT INTO contact_types
SELECT STREAM
   JSON_EXTRACT_FIRST(body, '$.username') as username,
   contact_type
FROM
   http_logs LATERAL
   JSON_EXTRACT_ALL(body, '$.contacts[*].type') as contact_type
WHERE
   method = "POST" AND
   url = "/users"

Conclusion

JSON_EXTRACT_FIRST() and JSON_EXTRACT_ALL() simplifies your life every time you have to deal with JSON that is represented as a string value of a field in your topic.

The use of JsonPath make them very powerful and even complex operations are easily representable with it.