This describes how to control event time for data in your Kafka topics with Lenses SQL Processors.
Every message in Kafka comes with a timestamp, and Lenses Engine Streaming mode uses that by default when doing time-dependent operations, like aggregations and joins.
Sometimes though that timestamp is not exactly what you need, and you would like to use a field in the record value or key as the new timestamp.
In Lenses SQL you can use the special EVENTTIME BY ...
syntax to control records timestamp.
In our toy example, we have a simple topic where electricity meter readings events are collected:
We can also insert some example data to do our experiments:
If you query the events, you can see that Kafka sets a timestamp for each record. That timestamp is, in our case, the time of when the record was inserted. As you can see, it is totally unrelated to the event_time
field we have in the payload.
We would like to transform our original stream of events, aggregating events with a hopping window of 10s
width and an increment of 5s
, computing the average for each window.
You can create a new processor that streams those averages, using the special WINDOW BY ...
syntax:
For customer 1
, we have three events in input, with a 5s delay between them, so we expect four output events for that customer, since 4 is the number of hopping windows involved.
ButChecking the emitted records we see that only two are produced.
This is because by default windowing operations works on the record timestamp, and in our case all the timestamps are pretty much the same, and they coincide with the time the records were inserted.
Fortunately e can change this behavior using the special EVENTTIME BY ...
syntax, specifying an expression to be used as a timestamp:
As you can see, the results have been windowed using event_time
as the timestamp: