Rekeying data

This page describes a tutorial to rekey data in a Kafka topic with Lenses SQL Processors.

Sometimes you have a topic that is almost exactly what you need, except that the key of the record requires a bit of massaging.

In Lenses SQL you can use the special SELECT ... as _key syntax to quickly re-key your data.

In our example, we have a topic containing events coming from temperature sensors.

Each record contains the sensor’ measured temperature, the time of the measurement, and the id of the sensor. The key is a unique string (for example, a UUID) that the upstream system assigns to the event.

You can replicate the example, by creating a topic in SQL Studio:

CREATE TABLE temperature_events(
    sensor_id string
    , temperature int
    , event_time long
)
FORMAT (string, avro);

We can also insert some example data to do our experiments:

INSERT INTO temperature_events(
    _key
    , sensor_id
    , temperature
    , event_time
) VALUES
("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "sensor-1", 12, 1591970345),
("0958C1E2-804F-4110-B463-7BEA25DA680C", "sensor-2", 13, 1591970346),
("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "sensor-3", 28, 1591970347),
("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "sensor-1", 13, 1591970348),
("A7F19576-6EFA-40A0-91C3-D8999A41F453", "sensor-1", 12, 1591970348),
("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "sensor-2", 31, 1591970349),
("8401CD82-ABFF-4D1B-A564-986DC971F913", "sensor-2", 30, 1591970349),
("F1CF77A0-6CFB-472F-B368-28F011F46C64", "sensor-1", 13, 1591970349);

You can explore the topic in lenses to check the content and the shape of what you just inserted.

Let’s say that what you need is that same stream of events, but the record key should be the sensor_id instead of the UUID.

With the special SELECT ... as _key syntax, a few lines are enough to define our new re-keying processor:

SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor
SELECT STREAM sensor_id as _key
FROM temperature_events;

The query above will take the sensor_id from the value of the record and put it as the new key. All the values fields will remain untouched:

Maybe the sensor_id is not enough, and for some reason, you also need the hour of the measurement in the key. In this case, the key will become a composite object with two fields: the sensor_id and the event_hour:

SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor_and_hour
SELECT STREAM
    temperature_events.sensor_id as _key.sensor_id
    , temperature_events.event_time / 3600 as _key.event_hour
FROM temperature_events;

As you can see, you can build composite objects in Lenses with ease just listing all the structure’s fields, one after the other.

In the last example, the _key output storage format will be inferred automatically by the system as JSON. If you need more control, you can use the STORE AS clause before the SELECT.

The following example will create a topic as the previous one, but where the keys will be stored as AVRO:

SET defaults.topic.autocreate=true;

INSERT INTO events_by_sensor_and_hour_avro_key
STORE KEY AS AVRO
SELECT STREAM
    temperature_events.sensor_id as _key.sensor_id
    , FLOOR(temperature_events.event_time / 3600) as _key.event_hour
FROM temperature_events;

Happy re-keying!

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.