View the latest documentation 5.5
In this tutorial, we will see how data in a Table can be aggregated continuously using GROUP BY and how the aggregated results are emitted downstream.
GROUP BY
More details about Aggregations and related functions can be found in the corresponding pages in the user documentation.
Let’s assume that we have a topic (game-sessions) containing data regarding remote gaming sessions by users.
game-sessions
Each gaming session will contain:
The above structure represents the value of each record in our game-sessions topic.
value
Additionally, each record will be keyed by user information, including the following:
In light of the above, a record might look like the following (in JSON for simplicity):
{ "key":{ "pid": 1, "name": "Billy", "surname": "Lagrange", "age": 30 }, "value":{ "points": 5, "sessionMetadata": { "country": "Italy", "language": "IT" } } }
We can replicate such structure using SQL Studio and the following query:
CREATE TABLE game-sessions( _key.pid int , _key.name string , _key.surname string , _key.age int , points double , sessionMetadata.country string , sessionMetadata.language string) FORMAT (avro, avro);
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
INSERT into game-sessions( _key.pid , _key.name , _key.surname , _key.age , points , sessionMetadata.country , sessionMetadata.language ) VALUES (1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'), (1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'), (1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'), (2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'), (2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'), (3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'), (4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'), (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'), (6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'), (7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'), (2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'), (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'), (3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'), (4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'), (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'), (6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'), (6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'), (2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'), (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT') ;
Now we can start processing the data we have inserted above.
Let’s imagine that we are told that we want to keep a running count of how many users are in a given country. To do this, we can assume that a user is currently in the same country where his last game took place.
We can achieve the above with the following query:
SET defaults.topic.autocreate=true; SET commit.interval.ms='1000'; -- this is to speed up the output generation in this tutorial INSERT INTO groupby-table-country SELECT TABLE COUNT(*) AS gamesPlayed FROM game-sessions GROUP BY sessionMetadata.country;
The content of the output topic, groupby-table-country, can now be inspected in the Lenses Explore screen and it will look similar to this:
groupby-table-country
The key results to notice here are the ones for Spain and the UK:
Spain
UK
2
Jorge
Dave
1
Nigel
Italy
The last point from above is the main difference (and power) of Tables vs. Streams: they represent the latest state of the world for each of their keys, so any aggregation will apply only on that latest data. If this is not clear enough, please refer to our documentation for a more in-depth coverage of these topics.
We can expand on the example from the previous section, imagining that our requirement was extended.
Just as before, we want to calculate statistics based on the current country of a user, as defined in Example 1, but now we want to know all the following:
All of the above can be achieved with the following query:
SET defaults.topic.autocreate=true; SET commit.interval.ms='1000'; INSERT INTO groupby-table-country-multi SELECT TABLE COUNT(*) AS gamesPlayed , SUM(points) as totalpoints , AVG(points) as avgpoints FROM game-sessions GROUP BY sessionMetadata.country;
The content of the output topic, groupby-table-country-multi, can now be inspected in the Lenses Explore screen and it will look similar to this:
groupby-table-country-multi
One thing to highlight here is that the functions we are using in this query (COUNT, SUM, and AVG) all support aggregating over Tables. However, that is not true of all functions. To find out which functions support Tables and which ones only support Streams, please refer to the dedicated section of the documentation.
COUNT
SUM
AVG
We will cover one final scenario where we want to filter some data within our aggregation.
There are two possible types of filtering we might want to do when it comes to aggregations:
WHERE
HAVING
Let’s see an example.
We want to calculate the statistics from Example 2, but grouping by the session language. Here we will make again the assumption that a user’s language is represented only by his latest recorded game session.
language
Additionally, we are only interested in languages used by players who don’t achieve a high total of points (we might want to focus our marketing team’s effort there, to keep them entertained). Finally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations if a given user appeared to have played from a given country.
For the sake of this example, we will:
The query for all of the above is:
SET defaults.topic.autocreate=true; SET commit.interval.ms='1000'; INSERT INTO groupby-table-language-filtered SELECT TABLE COUNT(*) AS gamesPlayed , SUM(points) as totalpoints , AVG(points) as avgpoints FROM game-sessions WHERE _key.name != 'Dave' OR sessionMetadata.country != 'Spain' GROUP BY sessionMetadata.language HAVING totalpoints < 100;
The content of the output topic, groupby-table-language-filtered, can now be inspected in the Lenses Explore screen and it will look similar to this:
groupby-table-language-filtered
In this tutorial, you learned how to use aggregation over Tables to:
Good luck and happy streaming!
On this page