This page describes the time and windowing of data in Kafka with Lenses SQL Processors.
A data stream is a sequence of events ordered by time. Each entry contains a timestamp component, which aligns it on the time axis.
Kafka provides the source for the data streams, and the Kafka message comes with the timestamp built in. This is used by the Push Engines by default. One thing to consider is that from a time perspective, the stream records can be out of order. Two Kafka records, R1 and R2 do not necessarily respect the rule: R1 timestamp is smaller than R2 timestamp.
Timestamps are required to perform time-dependent operations for streams - like aggregations and joins.
A record timestamp value can have three distinct meanings. Kafka allows to configure a topic timestamp meaning via this log.message.timestamp.type
setting. The two supported values are CreateTime
and LogAppendTime
.
When a record is created at source the producer is responsible for setting the timestamp for it. Kafka producer provides this automatically and this is aligned with the CreateTime
the configuration mentioned earlier.
At times, the data source timestamp is not available. When setting the topic timestamp type to LogAppendTime
, the Kafka broker will attach the timestamp at the moment it writes it to the topic.
The timestamp will be set to the time the record was read by the engine, ignoring any previously set timestamps.
Sometimes, when the data source is not under direct control, it might be that the record’s timestamp is actually embedded in the payload, either in the key or the value.
Lenses SQL Streaming allows to specify where to extract the timestamp from the record by using EVENTTIME BY
.
where <selection>
is a valid selection.
Here are a few examples on how to use the syntax to use the timestamp from the record value facet:
For those scenarios when the timestamp value lives within the record key, the syntax is similar:
All records produced by the Lenses SQL Streaming will have a timestamp set and its value will be one of the following:
For direct transformations, where the output record is a straightforward transformation of the input, the input record timestamp will be used.
For aggregations, the timestamp of the latest input record being aggregated will be used.
In all other scenarios, the timestamp at which the output record is generated will be used.
Some stream processing operations, like joins or aggregations, require distinct time boundaries which are called windows. For each time window, there is a start and an end, and as a result a duration. Performing aggregations over a time window, means only the records which fall within the time window boundaries are aggregated together. It might happen for the records to be out-of-order and arrive after the window end has passed, but they will be associated with the correct window.
There are three-time windows to be used at the moment: hopping, tumbling and session.
When defining a time window size, the following types are available:
These are fixed-size and overlapping windows. They are characterised by duration and the hop interval. The hop interval specifies how far a window moves forward in time relative to the previous window.
Since the windows can overlap, a record can be associated with more than one window.
Use this syntax to define a hopping window:
They are a particularisation of hopping windows, where the duration and hop interval are equal. This means that two windows can never overlap, therefore a record can only be associated with one window.
Duration time takes the same unit types as described earlier for hopping windows.
Unlike the other two window types, this window size is dynamic and driven by the data. Similar to tumbling windows, these are non-overlapping windows.
A session window is defined as a period of activity separated by a specified gap of inactivity. Any records with timestamps that occur within the boundaries of the inactivity interval are considered part of the existing sessions. When a record arrives and its timestamp is outside of the session gap, a new session window is created and the record will belong to that.
A new session window starts if the last record that arrived is further back in time than the specified inactivity gap. Additionally, different session windows might be merged into a single one if an event is received that falls in between two existing windows, and the resulting windows would then overlap.
To define a session window the following syntax should be used:
The inactivity interval can take the time unit type seen earlier for the hopping window.
Session windows are tracked on a per-key basis. This means windows for different keys will likely have different durations. Even for the same key, the window duration can vary.
User behaviour analysis is an example of when to use session windows. They allow metrics like counting user visits, customer conversion funnel or event flows.
It is quite common to see records belonging to one window arriving late, that is after the window end time has passed. To accept these records the notion of a grace period is supported. This means that if a record timestamp falls within a window W and it arrives within W + G (where G is the grace interval) then the record will be processed and the aggregations or joins will update. If, however, the record comes after the grace period then it is discarded.
To control the grace interval use this syntax:
The default grace period is 24 hours. Until the grace period elapses, the window is not actually closed.
Duration | Description | Example |
---|---|---|
ms
time in milliseconds.
100ms
s
time in seconds.
10s
m
time in minutes.
10m
h
time in hours.
10h