This describes how to control event time for data in your Kafka topics with Lenses SQL Processors.
Every message in Kafka comes with a timestamp, and Lenses Engine Streaming mode uses that by default when doing time-dependent operations, like aggregations and joins.
Sometimes though that timestamp is not exactly what you need, and you would like to use a field in the record value or key as the new timestamp.
In Lenses SQL you can use the special EVENTTIME BY ... syntax to control records timestamp.
Setting up our example
In our toy example, we have a simple topic where electricity meter readings events are collected:
If you query the events, you can see that Kafka sets a timestamp for each record. That timestamp is, in our case, the time of when the record was inserted. As you can see, it is totally unrelated to the event_time field we have in the payload.
We would like to transform our original stream of events, aggregating events with a hopping window of 10s width and an increment of 5s, computing the average for each window.
You can create a new processor that streams those averages, using the special WINDOW BY ... syntax:
SET defaults.topic.autocreate=true;INSERT INTO electricity_events_avg_wrongSELECT STREAM customer_id , AVG(KW) as KWFROM electricity_eventsWINDOW BY HOP 10s,5sGROUP BY customer_id
For customer 1, we have three events in input, with a 5s delay between them, so we expect four output events for that customer, since 4 is the number of hopping windows involved.
ButChecking the emitted records we see that only two are produced.
This is because by default windowing operations works on the record timestamp, and in our case all the timestamps are pretty much the same, and they coincide with the time the records were inserted.
Fortunately e can change this behavior using the special EVENTTIME BY ... syntax, specifying an expression to be used as a timestamp:
SET defaults.topic.autocreate=true;INSERT INTO electricity_events_avgSELECT STREAM customer_id , AVG(KW) as KWFROM electricity_eventsEVENTTIME BY event_timeWINDOW BY HOP 10s,5sGROUP BY customer_id
As you can see, the results have been windowed using event_time as the timestamp: