Time window aggregations
This page describes a tutorial to perform time windowed aggregations on Kafka topic data with Lenses SQL Processors.
In this tutorial we will see how data in a Stream can be aggregated continuously using GROUP BY
over a time window and the results are emitted downstream.
In Lenses SQL you can read your data as a STREAM
and quickly aggregate over it using the GROUP BY
clause and SELECT STREAM
Setting up our example
Let’s assume that we have a topic (game-sessions
) that contains data regarding remote gaming sessions by users.
Each gaming session will contain:
the points the user achieved throughout the session
Metadata information regarding the session:
The country where the game took place
The startAt the date and time the game commenced
The endedAt the date and time the game finished
The above structure represents the value
of each record in our game-sessions
topic.
Additionally, each record will be keyed by user information, including the following:
A pid, or player id, representing this user uniquely
Some additional denormalised user details:
a name
a surname
an age
Keep in mind This is just an example in the context of this tutorial. Putting denormalised data in keys is not something that should be done in a production environment.
In light of the above, a record might look like the following (in json for simplicity):
We can replicate such structure using SQL Studio and the following query:
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
The time a game started and completed is expressed in epoch time. To see the human readable values, run this query:
Count how many games were played per user every 10 seconds
Now we can start processing the data we have inserted above.
One requirement could be to count how many games each user has played every 10 seconds.
We can achieve the above with the following query:
The content of the output topic, games_per_user_every_10_seconds
, can now be inspected and eventually it will look similar to this:
As you can see, the keys of the records did not change, but their value is the result of the specified aggregation. The gamer Billy Lagrange has two entries because he played 2 games, the first two with a start window between 2020-07-23 17:08:00
and 2020-07-23 17:08:10
(exclusive), and the third entry between 2020-07-23 17:08:10
(inclusive) and 2020-07-23 17:08:20
(exclusive).
You might have noticed that groupby-key
has been created as a compacted topic, and that is by design.
All aggregations result in a Table because they maintain a running, fault-tolerant, state of the aggregation and when the result of an aggregation is written to a topic, then the topic will need to reflect these semantics (which is what a compacted topic does).
Count how many games were played per country every 10 seconds
We can expand on the example from the previous section. We now want to know, for each country on a 10 seconds interval, the following:
count how many games were played
what are the top best 3 results
All the above can be achieved with the following query:
The content of the output topic, games_per_country_every_10_seconds
, can now be inspected in the SQL Studio screen by running:
There are 2 entries for Italy, since there is one game played at 2020-07-23 18:08:11
. Also, notice for the other entry on Italy, there are 4 occurrences and 3 max points. The reason for 4 occurrence is down to 4 games, two each from Billy Lagrange and Maria Rossi within the 10 seconds time window between 2020-07-23 18:08:00
and 2020-07-23 18:08:10
(exclusive).
Conclusion
In this tutorial you learned how to use aggregation over Streams to:
group by the current
key
of a recordgroup by a field in the input record
use a time window to define the aggregation over.
Good luck and happy streaming!
Last updated