Time window aggregation over streaming data


In this tutorial we will see how data in a Stream can be aggregated continuously using GROUP BY over a time window and the results are emitted downstream.

More details about Aggregations and [related functions](/5.2/lenses-sql/functions/#aggregated-functions can be found in the corresponding pages in the user documentation.

Setting up our example 

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session
  • Metadata information regarding the session:
    • The country where the game took place
    • The startAt the date and time the game commenced
    • The endedAt the date and time the game finished

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will be keyed by user information, including the following:

  • A pid, or player id, representing this user uniquely
  • Some additional denormalised user details:
    • a name
    • a surname
    • an age

In light of the above, a record might look like the following (in json for simplicity):

{
  "key": {
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value": {
    "points": 5,
    "country": "Italy",
    "language": "IT",
    "startedAt": 1595435228,
    "endedAt": 1595441828
  }
}

We can replicate such structure using SQL Studio and the following query:

CREATE TABLE game-sessions(
  _key.pid int,
  _key.name string,
  _key.surname string,
  _key.age int,
  points double,
  country string,
  startedAt long,
  endedAt long)
FORMAT (avro, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
  _key.pid,
  _key.name,
  _key.surname,
  _key.age,
  points,
  country,
  startedAt,
  endedAt
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 1595524080000, 1595524085000),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 1595524086000, 1595524089000),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 1595524091000, 1595524098000),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 1595524080000, 1595524085000),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 1595524086000, 1595524089000),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 1595524086000, 1595524089000),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 1595524080000, 1595524085000),
(5, 'John', 'Bolden', 40, 10, 'USA', 1595524080000, 1595524085000);

The time a game started and completed is expressed in epoch time. To see the human readable values, run this query:

SELECT
  startedAt
  , DATE_TO_STR(startedAt, 'yyyy-MM-dd HH:mm:ss') as started
  , endedAt
  , DATE_TO_STR(endedAt 'yyyy-MM-dd HH:mm:ss') as ended
FROM game-sessions;
Content of game-sessions displaying the human readable timestamps

Example 1 - Count how many games were played per user every 10 seconds 

Now we can start processing the data we have inserted above.

One requirement could be to count how many games each user has played every 10 seconds.

We can achieve the above with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='2000';  -- this is just to speed up the output generation in this tutorial

INSERT INTO games_per_user_every_10_seconds
SELECT STREAM
    COUNT(*) as occurrences
    , MAXK_UNIQUE(points,3) as maxpoints
    , AVG(points) as avgpoints
FROM game-sessions
EVENTTIME BY startedAt
WINDOW BY TUMBLE 10s
GROUP BY _key

The content of the output topic, games_per_user_every_10_seconds, can now be inspected and eventually it will look similar to this:

Content of groupby-key topic

As you can see, the keys of the records did not change, but their value is the result of the specified aggregation. The gamer Billy Lagrange has two entries because he played 2 games, the first two with a start window between 2020-07-23 17:08:00 and 2020-07-23 17:08:10(exclusive), and the third entry between 2020-07-23 17:08:10 (inclusive) and 2020-07-23 17:08:20(exclusive)

Example 2 - Count how many games were played per country every 10 seconds 

We can expand on the example from the previous section. We now want to know, for each country on a 10 seconds interval, the following:

  • count how many games were played
  • what are the top best 3 results

All the above can be achieved with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='2000';  -- this is just to speed up the output generation in this tutorial

INSERT INTO games_per_country_every_10_seconds
SELECT STREAM
    COUNT(*) as occurrences
    , MAXK_UNIQUE(points,3) as maxpoints
    , country
FROM game-sessions
EVENTTIME BY startedAt
WINDOW BY TUMBLE 10s
GROUP BY country

The content of the output topic, games_per_country_every_10_seconds, can now be inspected in the SQL Studio screen by running:

SELECT *
FROM games_per_country_every_10_seconds

There are 2 entries for Italy, since there is one game played at 2020-07-23 18:08:11. Also, notice for the other entry on Italy, there are 4 occurrences and 3 max points. The reason for 4 occurrence is down to 4 games, two each from Billy Lagrange and Maria Rossi within the 10 seconds time window between 2020-07-23 18:08:00 and 2020-07-23 18:08:10(exclusive).

Content of games_per_country_every_10_seconds topic

Conclusion 

In this tutorial you learned how to use aggregation over Streams to:

  • group by the current key of a record
  • group by a field in the input record
  • use a time window to define the aggregation over.

You achieved all the above using Lenses SQL engine. You can find out about the different time windows in the documentation.

Good luck and happy streaming!

--
Last modified: September 15, 2024