Aggregation

Sometimes information about groups of data, rather than individual records, is required. This is where aggregation comes into play. Typical streaming aggregation involves scenarios similar to these:

  • Counting the number of visitors on your website per region
  • Totaling the amount of Foreign Exchange transactions for GBP-USD on a 15 minutes interval
  • Totaling the sales made on each one of the company stores every day
  • Retaining the minimum and maximum stock value on a 30 minutes interval

These are just a few examples - the list goes on. Lenses SQL supports aggregating streaming data.

For a trading system displaying the number of transactions made for each currency pair (GBPUSD is a currency exchange ticker), this a typical requirement. Such functionality can be easily achieved with a SQL like this:

INSERT INTO total_transactions
SELECT count(*) AS transaction_count
FROM fx
GROUP BY ticker

This is a stateful stream (it creates a table) so potentially, you could see the values for a ticker more than once. This depends on how many transactions flow through the input topic. The result of this query could be the following:

Key Value
GBPUSD 1
CHFYEN 1
USDEUR 1
GBPUSD 3
USDEUR 5

If the aggregation needs to happen for specific tickers then there are two approaches. The first one can use the WHERE clause (more performant option), whereas the second approach can use the HAVING clause. Both cases are covered by the SQL below:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE ticker LIKE '%GBP%'
GROUP BY ticker

--OR

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
GROUP BY ticker
HAVING ticker in ('GBPUSD', 'EURDKK', 'SEKCHF')

--OR

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
HAVING ticker IN ('GBPUSD', 'EURDKK', 'SEKCHF') OR ticker LIKE '%USD%'

Sometimes it is required to break up the data into groups. Lenses SQL allows to group by the Key, fields in the Key, Value, and fields in the Value. Assuming the record Key is a STRING and contains the foreign exchange ticker, here is how grouping by the Key can be done:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
GROUP BY _key

-- OR adding a filter

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _key.* LIKE '%GBP%'
GROUP BY _key

Important

When a field from the Key or Value is used in the GROUP BY statement, the resulting record Key will be a STRING containing the field text representation (this occurs even when the field is an integer, double, long, etc). Grouping on multiple fields is also possible.

Arithmetic aggregation involving functions is not yet supported for example: SUM(fieldA)/count(*). Here is how the above calculation can be achieved by defining an intermediary stream first where count and sum(temperature) are calculated and in the final step the division can be done (see temperatureTotal/total in the last SELECT statements):

SET `auto.offset.reset`='latest';
SET autocreate = true;
SET `commit.interval.ms` = 3000;

INSERT INTO sensor_data_avg
WITH
avgStream as
(
SELECT STREAM
        COUNT(*) as total,
        SUM(temperature) AS temperatureTotal,
        SUM(humidity) AS humidityTotal,
        MIN(temperature) AS minTemperature,
        MAX(temperature) AS maxTemperature,
        MIN(humidity) AS minHumidity,
        MAX(humidity) AS maxHumidity
FROM  `sensor_data`
GROUP BY TUMBLE(2,s),_key
)

SELECT STREAM
    temperatureTotal/total AS avgTemperature,
    humidityTotal/total AS avgHumidity,
    minTemperature,
    maxTemperature,
    minHumidity,
    maxHumidity
FROM avgStream

Notice the last SELECT statement uses the output data from the first one in order to achieve the calculation of the average.

Using Window

So far the examples for aggregation did not involve windowing. Aggregating over a time window is a very common scenario for streaming. Visit the windowing section if the concepts are not fully understood.

Considering an IoT system capturing data from devices around the world, counting the records received based on the country location and on a 30 seconds interval, can be achieved with this SQL code:

INSERT INTO norway_sensors_count
SELECT STREAM count(*) AS total
FROM sensors
GROUP BY tumble(30,s), cca3

And here is the incoming device data:

{
  "device_id": 2,
  "device_type": "sensor-gauge",
  "ip": "193.156.90.200",
  "cca3": "NOR",
  "cn": "Norway",
  "temp": 18,
  "signal": 26,
  "battery_level": 8,
  "timestamp": 1475600522
}

The continuous query would emit the results on a 30 seconds interval and they would look similar to this:

Key Value
NOR 10
ROM 2
FRA 126
UK 312
US 289
NOR 2
FRA 16
UK 352
US 219

Aggregation is not limited to COUNT function only. SUM, MIN or MAX are supported as well. For an order processing system, computing every hour the total amount of orders over the last 24 hours can be achieved with this SQL:

SELECT STREAM
      product
    , SUM(amount) AS amount
FROM Orders
GROUP BY HOP(1, H, 1, D), product