Aggregating data in a table

This page describes a tutorial to aggregate Kafka topic data into a table using Lenses SQL Processors.

In this tutorial, we will see how data in a table can be aggregated continuously using GROUP BY and how the aggregated results are emitted downstream.

In Lenses SQL you can read your data as a TABLE and quickly aggregate over it using the GROUP BY clause and SELECT TABLE.

Setting up our example

Let’s assume that we have a topic (game-sessions) containing data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session

  • Metadata information regarding the session:

    • The country where the game took place

    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely

  • Some additional denormalised user details:

    • a name

    • a surname

    • an age

Putting denormalised data in keys is not something that should be done in a production environment.

In light of the above, a record might look like the following (in JSON for simplicity):

{
  "key":{
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value":{
    "points": 5,
    "sessionMetadata": {
      "country": "Italy",
      "language": "IT"
    }
  }
}

We can replicate such structure using SQL Studio and the following query:

CREATE TABLE game-sessions(
    _key.pid int
    , _key.name string
    , _key.surname string
    , _key.age int
    , points double
    , sessionMetadata.country string
    , sessionMetadata.language string)
FORMAT (avro, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
    _key.pid
    , _key.name
    , _key.surname
    , _key.age
    , points
    , sessionMetadata.country
    , sessionMetadata.language
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
;

Count the users that are in a given country

Now we can start processing the data we have inserted above.

Let’s imagine that we are told that we want to keep a running count of how many users are in a given country. To do this, we can assume that a user is currently in the same country where his last game took place.

We can achieve the above with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';  -- this is to speed up the output generation in this tutorial

INSERT INTO groupby-table-country
SELECT TABLE
  COUNT(*) AS gamesPlayed
FROM game-sessions
GROUP BY sessionMetadata.country;

The content of the output topic, groupby-table-country, can now be inspected in the Lenses Explore screen and it will look similar to this:

The key results to notice here are the ones for Spain and the UK:

  • Spain is 2 because Jorge and Dave had their last game played there.

  • UK is 1 because, while Nigel had his only game played there, Dave initially played from the UK, but then from Italy and finally from Spain. Dave contribution was, therefore, subtracted from the UK count value.

The last point from above is the main difference (and power) of Tables vs. Streams: they represent the latest state of the world for each of their keys, so any aggregation will apply only on that latest data. If this is not clear enough.

Given what a Table is, it will have by definition only a single value for any given key, so doing GROUP BY _key on a Table is a pointless operation because it will always only generate 1-element groups.

Calculate the total and average points of games played in a given language

We can expand on the example from the previous section, imagining that our requirement was extended.

Just as before, we want to calculate statistics based on the current country of a user, as defined in Example 1, but now we want to know all the following:

  • count how many users are in a given country

  • what is the total amount of points these users achieved

  • what is the average amount of points these users achieved

All of the above can be achieved with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

INSERT INTO groupby-table-country-multi
SELECT TABLE
    COUNT(*) AS gamesPlayed
    , SUM(points) as totalpoints
    , AVG(points) as avgpoints
FROM game-sessions
GROUP BY sessionMetadata.country;

The content of the output topic, groupby-table-country-multi, can now be inspected in the Lenses Explore screen and it will look similar to this:

One thing to highlight here is that the functions we are using in this query (COUNT, SUM, and AVG) all support aggregating over Tables. However, that is not true of all functions. To find out which functions support Tables and which ones only support Streams.

Filtering aggregation data

We will cover one final scenario where we want to filter some data within our aggregation.

There are two possible types of filtering we might want to do when it comes to aggregations:

  • Pre-aggregation: we want some rows to be ignored by the grouping, so they will not be part of the calculation done by aggregation functions. In these scenarios, we will use the WHERE clause.

  • Post-aggregation: we want to filter the aggregation results themselves so that those aggregated records that meet some specified condition are not emitted at all. In these scenarios, we will use the HAVING clause.

Let’s see an example.

We want to calculate the statistics from Example 2, but grouping by the session language. Here we will make again the assumption that a user’s language is represented only by his latest recorded game session.

Additionally, we are only interested in languages used by players who don’t achieve a high total of points (we might want to focus our marketing team’s effort there, to keep them entertained). Finally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations if a given user appeared to have played from a given country.

For the sake of this example, we will:

  • Show statistics for languages with total points lower than 100

  • Ignore sessions that Dave made from Spain (because we know he was not there)

The query for all of the above is:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

INSERT INTO groupby-table-language-filtered
SELECT TABLE
    COUNT(*) AS gamesPlayed
    , SUM(points) as totalpoints
    , AVG(points) as avgpoints
FROM game-sessions
WHERE _key.name != 'Dave' 
    OR sessionMetadata.country != 'Spain'
GROUP BY sessionMetadata.language
HAVING totalpoints < 100;

The content of the output topic, groupby-table-language-filtered, can now be inspected in the Lenses Explore screen and it will look similar to this:

Notice that IT (which is the only language that has 120 points in total) appears in the output but without any data in the value section.

This is because aggregations are Tables, and the key IT used to be present (while it was lower than 100), but then it was removed. Deletion is expressed, in Tables, by setting the value section of a record to null, which is what we are seeing here.

Conclusion

In this tutorial, you learned how to use aggregation over Tables to:

  • group by arbitrary fields, based on the latest state of the world

  • calculate multiple results in a single processor

  • filtering both the data that is to be aggregated and the one that will be emitted as a result of the aggregation itself

Good luck and happy streaming!

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.