Aggregations


Aggregations are stateful transformations that allow to group an unbounded set of inputs into sub-sets and then to aggregate each of these sub-sets into a single output; the reason why they are stateful is because they need to maintain the current state of computation between the application of each input.

To group a given input dataset into sub-sets, a key function needs to specified; the result of applying this key function to an input record will be used as a discriminator (sometimes called a pivot) to determine in what sub-set each input record is to be bucketed.

The specific transformation that each aggregation performs is described by the Aggregated Functions used in the input query. See a complete list of aggregate functions.

Aggregations match table semantics 

Notice that the behavior described above is precisely what a Table does. For any given key, there is the state will continuously be updated as new events with the given key are received. In the case of Aggregations, new events are represented by input records in the original dataset that will map to a given key, therefore ending up in a bucket or another.

Whenever Aggregations are used, the result will be a Table. Each entry will have the key set to the grouping discriminator, and the value set to the current state of computation for all input records matching the key.

Syntax 

The complete syntax for aggregations is:

SELECT (STREAM | TABLE)
  <aggregated projection1>
    [, aggregated projection2] ... [, aggregated projectionN]
    [, projection1] ... [, projectionM]
FROM
  <source>
[WINDOW BY <window description>]
GROUP BY <expression>
;

The specific syntactical elements of the above are:

  • (STREAM | TABLE): specifies if the <source> is to be interpreted as a stream or a table.
  • <aggregated projection1>: a projection is aggregated when its source contains an Aggregated Functions (e.g. COUNT(*) as x, CAST(COUNT(*) as STRING) as stringed).
  • [, aggregated projection2] ... [, aggregated projectionN]: a query can contain any number of additional aggregated projections after the first mandatory one.
  • [, projection1] ... [, projectionM]: a query can contain any number of common, non-aggregated, projections. Streaming only supports full GROUP BY mode. This means that fields that are not part of the GROUP BY clause cannot be referenced by non-aggregated projections.
  • <source>: a normal source, like a topic or the result of a WITH statement.
  • [WINDOW BY <window description>]: this optional section can only be specified if STREAM is used. It allows to describe the windowing that will be applied to the aggregation. More on this below.
  • GROUP BY <expression>: the result of evaluating <expression> will be used to divide the input values into different groups. These groups will be the input for each aggregated projection specified. The <expression>’s result will become the key for the table resulting from the query.

Specific rules for aggregated projections 

Most of the rules and syntax described for Projections apply to aggregated projections as well, but there are some additional syntactical rules due to the specific nature of Aggregations.

  • Aliasing rules mostly work the same, but it is not possible to project on the key facet; COUNT(*) as _key.a or SUM(x) as _key are therefore not allowed.
  • At least one aggregated projections must be specified in the query.
  • Projections using an unqualified key facet as source are not allowed. _key.a or COUNT(_key.b) are forbidden because _key is unqualified, but <source>._key.a and COUNT(<source>._key.b) are supported.

Grouping and storage format for key 

INSERT INTO target-topic
SELECT STREAM  
    COUNT(*) AS records
FROM input-topic
GROUP BY field1;

As previously mentioned, the GROUP BY is used to determine the key of the query’s result; the above query will group all records in input-topic by the value of field1 in each record, and target-topic’s key will be the schema of field1.

Just like in the case of the Projections, the Streaming mode takes an opinionated approach here and will simplify the result schema and Storage Format in case of single field structures.

In the case above, assuming for example that field1 is an integer, target-topic’s key will not be a structure with a single integer field1 field, but rather just the value field1; the resulting storage format is going to be INT, and the label field1 will be just dropped.

In case the above behavior is not desirable, specifying an explicit alias will allow to override it.

INSERT INTO target-topic
SELECT STREAM  
    COUNT(*) AS records,
FROM input-topic
GROUP BY field1 AS keep_me;

This will result in target-topic’s key being a structure with a field keep_me, with the same schema as field1. The corresponding Storage Format will match the input format for input-topic, AVRO or JSON.

Semantics 

An example will help clarifying how aggregations work, as well as how they behave depending on the semantics of the input dataset they are being applied to.

Example scenario 

Assume that we have a Kafka topic (gaming-sessions) containing these records:

OffsetKeyValue
0billy{points: 50, country: “uk”}
1billy{points: 90, country: “uk”}
2willy{points: 70, country: “uk”}
3noel{points: 82, country: “uk”}
4john{points: 50, country: “usa”}
5dave{points: 30, country: “usa”}
6billy{points: 90, country: “spain”}

What this data describes is a series of gaming sessions, performed by a player. For each gaming session, the player (used as Key), the points achieved, and the country where the game took place.

Aggregating a Stream 

Let’s now assume that what we want to calculate is the total points achieved by players in a given country, as well as the average points per game.
One way to achieve the desired behavior is to build a Stream from the input topic. Remember that this means that each event will be considered in isolation.

INSERT INTO target-topic
SELECT STREAM
    SUM(points) AS total_points
    , AVG(points) AS average_points
FROM gaming-sessions
GROUP BY country

Explanations for each element of this syntax can be found below, but very briefly, this builds a Stream from gaming-sessions, grouping all events by country (e.g. all records with the same country will be aggregated together) and finally calculating the total (total_points) and the average (average_points) of all points for a given group.

The final result in target-topic will be (disregarding intermediate events):

OffsetKeyValue
3uk{total_points: 292, average_points: 73}
5usa{total_points: 80, average_points: 40}
6spain{total_points: 90, average_points: 90}

The results are calculated from the totality of the input results, because in a Stream, each event is independent and unrelated with any other.

Aggregating a Table 

We now want to calculate something similar to what we obtain before, but we want to keep track only of the last session played by a player, as it might give us a better snapshot of both the performances and locations of players worldwide. The statistics we want to gather are the same as before: total and average of points per country.

The way to achieve the above requirement is simply by reading gaming-sessions into a Table, rather than a Stream, and aggregate it.

INSERT INTO target-topic
SELECT TABLE
    SUM(points) AS total_points
    , AVG(points) AS average_points
FROM gaming-sessions
GROUP BY country

The final result in target-topic will be (disregarding intermediate events):

KeyValue
uk{total_points: 152, average_points: 76}
usa{total_points: 80, average_points: 40}
spain{total_points: 90, average_points: 90}

Compare this with the behavior from the previous scenario; the key difference is that the value for uk includes only willy and noel, and that’s because the last event moved billy to the spain bucket, removing all data regarding him from his original group.

Aggregation functions for Tables 

The previous section described the behavior of aggregations when applied to Tables, and highlighted how aggregations not only need to be able to sum the latest values received to the current state of a group, but also need to be able to subtract an obsolete value that might have just been assigned to a new group. As we saw above, it is easy to do this in case of SUM and AVG.

However, consider what would happen if we wanted to add a new statistics to the ones calculated above: the maximum points achieved by a player in a given country.

In the Stream scenario, this can be achieved by simply adding MAXK(points,1) as max_points to the query.

INSERT INTO target-topic
SELECT STREAM
    SUM(points) AS total_points
    , AVG(points) AS average_points
    , MAXK(points,1) AS max_points
FROM gaming-sessions
GROUP BY country

OffsetKeyValue
3uk{total_points: 292, average_points: 73, max_points: 90}
5usa{total_points: 80, average_points: 40, max_points: 50}
6spain{total_points: 90, average_points: 90, max_points: 90}

In the Table scenario however things are different. We know that the final event moves billy from uk to spain, so we need to subtract from uk all information related to billy. In case of SUM and AVG that’s possible, because subtracting billy’s points to the current value of the aggregation will return the correct result.

But that’s not possible for MAXK. MAXK(points, 1) only keeps track of 1 value, the highest seen so far, and if that’s removed, what value should take its place? The aggregation function cannot inspect the entire topic data to search for the correct answer. The state the aggregation function has access to, is that single number 90, which now is invalid.

This problem explains why some aggregated functions can be used on Streams and Tables both (e.g. SUM), while others can be used only on Streams (e.g. MAXK).

The key factor is usually whether an hypothetical subtraction operation would need access to all previous inputs to calculate its new value (like MAXK) or just the aggregated state (like SUM).

Windowed aggregations 

A common scenario that arises in the context of aggregations is the idea of adding a time dimension to the grouping logic expressed in the query. For example, one might want to group all input records by a given field and that were received within 1 hour of each other.

To express the above Lenses SQL Streaming supports windowed aggregations, by adding a WINDOW BY clause to the query. Given their semantics, tables cannot be aggregated using a window, because it would not make sense. A table represents the latest_ state of a set of (Key, Value) pairs, not a series of events interspersed over a time-continuum. Thus trying to window them is not a sensible operation.

Details of the supported types of windows, their specific syntax as well as the fundamental relationship between stream processing and time can be found in the Time and Windows page of this documentation.

Filtering aggregated queries 

Filtering the input into aggregated queries is similar to filtering non-aggregated ones. When using a WHERE <expression> statement, where <expression> is a valid SQL boolean expression, all records that do not match the predicate will be left out.

However, aggregated functions add a further dimension to what it might be desirable to filter.

We might be interested in filtering base on some conditions of the groups themselves; for example, we might want to count all inpuut records that have a given value of field1, but only if the total is greater than 3. In this case, WHERE would not help, because it has not access to the groups nor to the results of the aggregated projections. The below query is what is needed.

INSERT INTO target-topic
SELECT STREAM
    COUNT(*) as sessions
FROM gaming-sessions
GROUP BY country
HAVING sessions > 3;

The above query uses the HAVING clause to express a filter at a grouping level. Using this feature it is possible to express a predicate on the result of aggregated projections and filter out the output records that do not satisfy it.

Note that only aggregated projections specified in the SELECT clause can be used within the HAVING clause.

--
Last modified: September 15, 2024