Aggregations
This page describes how to aggregate data in Kafka with Lenses SQL Processors.
Aggregations are stateful transformations that allow to grouping of an unbounded set of inputs into sub-sets and then aggregate each of these sub-sets into a single output; the reason why they are stateful is because they need to maintain the current state of computation between the application of each input.
To group a given input dataset into sub-sets, a key function needs to be specified; the result of applying this key function to an input record will be used as a discriminator (sometimes called a pivot) to determine in what sub-set each input record is to be bucketed.
The specific transformation that each aggregation performs is described by the Aggregated Functions used in the input query.
Aggregations match table semantics
Notice that the behaviour described above is precisely what a Table does. For any given key, there is the state will continuously be updated as new events with the given key are received. In the case of Aggregations, new events are represented by input records in the original dataset that will map to a given key, therefore ending up in a bucket or another.
Whenever Aggregations are used, the result will be a Table. Each entry will have the key set to the grouping discriminator, and the value set to the current state of computation for all input records matching the key.
Syntax
The complete syntax for aggregations is:
The specific syntactical elements of the above are:
(STREAM | TABLE)
: specifies if the<source>
is to be interpreted as a stream or a table.<aggregated projection1>
: a projection is aggregated when its source contains an Aggregated Function (e.g.COUNT(*) as x
,CAST(COUNT(*) as STRING) as stringed
).[, aggregated projection2] ... [, aggregated projectionN]
: a query can contain any number of additional aggregated projections after the first mandatory one.[, projection1] ... [, projectionM]
: a query can contain any number of common, non-aggregated, projections. Streaming only supports full GROUP BY mode. This means that fields that are not part of theGROUP BY
clause cannot be referenced by non-aggregated projections.<source>
: a normal source, like a topic or the result of aWITH
statement.[WINDOW BY <window description>]
: this optional section can only be specified ifSTREAM
is used. It allows us to describe the windowing that will be applied to the aggregation.GROUP BY <expression>
: the result of evaluating<expression>
will be used to divide the input values into different groups. These groups will be the input for each aggregated projection specified. The<expression>
’s result will become the key for the table resulting from the query.
Specific rules for aggregated projections
Most of the rules and syntax described for Projections apply to aggregated projections as well, but there are some additional syntactical rules due to the specific nature of Aggregations.
Aliasing rules mostly work the same, but it is not possible to project on the key facet;
COUNT(*) as _key.a
orSUM(x) as _key
are therefore not allowed.At least one aggregated projection must be specified in the query.
Projections using an unqualified key facet as the source are not allowed.
_key.a
orCOUNT(_key.b)
are forbidden because_key
is unqualified, but<source>._key.a
andCOUNT(<source>._key.b)
are supported.
Grouping and storage format for key
As previously mentioned, the GROUP BY
is used to determine the key of the query’s result; the above query will group all records in input-topic
by the value of field1
in each record, and target-topic
’s key will be the schema of field1
.
Just like in the case of the Projections, the Streaming mode takes an opinionated approach here and will simplify the result schema and Storage Format in the case of single field structures.
In the case above, assuming for example that field1
is an integer, target-topic
’s key will not be a structure with a single integer field1
field, but rather just the value field1
; the resulting storage format is going to be INT
, and the label field1
will be just dropped.
In case the above behaviour is not desirable, specifying an explicit alias will allow us to override it.
This will result in target-topic
’s key is a structure with a field keep_me
, with the same schema as field1
. The corresponding Storage Format will match the input format for input-topic
, AVRO
or JSON
.
Semantics
An example will help clarify how aggregations work, as well as how they behave depending on the semantics of the input dataset they are being applied to.
Example scenario
Assume that we have a Kafka topic (gaming-sessions
) containing these records:
Offset | Key | Value |
---|---|---|
0 | billy | {points: 50, country: “uk”} |
1 | billy | {points: 90, country: “uk”} |
2 | willy | {points: 70, country: “uk”} |
3 | noel | {points: 82, country: “uk”} |
4 | john | {points: 50, country: “usa”} |
5 | dave | {points: 30, country: “usa”} |
6 | billy | {points: 90, country: “spain”} |
What this data describes is a series of gaming sessions, performed by a player. For each gaming session, the player (used as Key), the points achieved, and the country where the game took place.
Aggregating a Stream
Let’s now assume that what we want to calculate is the total points achieved by players in a given country, as well as the average points per game. One way to achieve the desired behaviour is to build a Stream from the input topic. Remember that this means that each event will be considered in isolation.
Explanations for each element of this syntax can be found below, but very briefly, this builds a Stream from gaming-sessions
, grouping all events by country
(e.g. all records with the same country
will be aggregated together) and finally calculating the total (total_points
) and the average (average_points
) of all points for a given group.
The final result in target-topic
will be (disregarding intermediate events):
Offset | Key | Value |
---|---|---|
3 | uk | {total_points: 292, average_points: 73} |
5 | usa | {total_points: 80, average_points: 40} |
6 | spain | {total_points: 90, average_points: 90} |
The results are calculated from the totality of the input results because in a Stream, each event is independent and unrelated to any other.
Aggregating a Table
We now want to calculate something similar to what we obtained before, but we want to keep track only of the last session played by a player, as it might give us a better snapshot of both the performances and locations of players worldwide. The statistics we want to gather are the same as before: total and average of points per country.
The way to achieve the above requirement is simply by reading gaming-sessions into a Table, rather than a Stream, and aggregating it.
The final result in target-topic
will be (disregarding intermediate events):
Key | Value |
---|---|
uk | {total_points: 152, average_points: 76} |
usa | {total_points: 80, average_points: 40} |
spain | {total_points: 90, average_points: 90} |
Compare this with the behaviour from the previous scenario; the key difference is that the value for uk
includes only willy
and noel
, and that’s because the last event moved billy
to the spain
bucket, removing all data regarding him from his original group.
Aggregation functions for Tables
The previous section described the behaviour of aggregations when applied to Tables, and highlighted how aggregations not only need to be able to sum the latest values received to the current state of a group but also need to be able to subtract an obsolete value that might have just been assigned to a new group. As we saw above, it is easy to do this in the case of SUM
and AVG
.
However, consider what would happen if we wanted to add new statistics to the ones calculated above: the maximum points achieved by a player in a given country.
In the Stream scenario, this can be achieved by simply adding MAXK(points,1) as max_points
to the query.
Offset | Key | Value |
---|---|---|
3 | uk | {total_points: 292, average_points: 73, max_points: 90} |
5 | usa | {total_points: 80, average_points: 40, max_points: 50} |
6 | spain | {total_points: 90, average_points: 90, max_points: 90} |
In the Table scenario, however, things are different. We know that the final event moves billy
from uk
to spain
, so we need to subtract from uk
all information related to billy
. In case of SUM
and AVG
that’s possible because subtracting billy
’s points to the current value of the aggregation will return the correct result.
But that’s not possible for MAXK
. MAXK(points, 1)
only keeps track of 1
value, the highest seen so far, and if that’s removed, what value should take its place? The aggregation function cannot inspect the entire topic data to search for the correct answer. The state the aggregation function has access to is that single number, which now is invalid.
This problem explains why some aggregated functions can be used on Streams and Tables (e.g. SUM
), while others can be used only on Streams (e.g. MAXK
).
The key factor is usually whether a hypothetical subtraction operation would need access to all previous inputs to calculate its new value (like MAXK
) or just the aggregated state (like SUM
).
Windowed aggregations
A common scenario that arises in the context of aggregations is the idea of adding a time dimension to the grouping logic expressed in the query. For example, one might want to group all input records by a given field that were received within 1 hour of each other.
To express the above Lenses SQL Streaming supports windowed aggregations, by adding a WINDOW BY
clause to the query. Given their semantics, tables cannot be aggregated using a window, because it would not make sense. A table represents the latest_ state of a set of (Key, Value) pairs, not a series of events interspersed over a time continuum. Thus trying to window them is not a sensible operation.
Filtering aggregated queries
Filtering the input into aggregated queries is similar to filtering non-aggregated ones. When using a WHERE <expression>
statement, where <expression>
is a valid SQL boolean expression, all records that do not match the predicate will be left out.
However, aggregated functions add a further dimension to what it might be desirable to filter.
We might be interested in filtering based on some conditions of the groups themselves; for example, we might want to count all input records that have a given value offield1
, but only if the total is greater than 3. In this case, WHERE
would not help, because it has no access to the groups nor to the results of the aggregated projections. The below query is what is needed.
The above query uses the HAVING
clause to express a filter at a grouping level. Using this feature it is possible to express a predicate on the result of aggregated projections and filter out the output records that do not satisfy it.
Only aggregated projections specified in the SELECT
clause can be used within the HAVING
clause.
Last updated