Changing the shape of data

This page describe a tutorial to change the shape (fields) of data in a Kafka topic using Lenses SQL Processors.

In this tutorial, we will see how to use Lenses SQL to alter the shape of your records.

In Lenses SQL you can quickly reshape your data with a simple SELECT and some built-in functions.

We will learn how to

  • put value fields into the key

  • lift key fields into the value

  • call functions to transform your data

  • build nested structures for your keys/values

  • unwrap singleton objects into primitive types

Setting up our example

In our example, we are getting data from speed sensors from a speed car circuit.

The upstream system registers speed measurement events as records in a Kafka topic.

An example of such an event is the following:

KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
VALUE: {
  "car_id": "car_2",
  "speedMph": 84,
  "sensor": {
    "id": "sensor_1",
    "lat": 45,
    "long": 0
  },
  "event_time": 159230614
}

We can replicate such a structure running the following query in SQL Studio:

CREATE TABLE car_speed_events(
    car_id string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (string, avro);

Each event is keyed by a unique string generated by the upstream system.

We can again use SQL Studio to insert some data to play with:

INSERT INTO car_speed_events(
    _key
    , car_id
    , speedMph
    , sensor.id
    , sensor.lat
    , sensor.long
    , event_time
) VALUES
("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "car-1", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("0958C1E2-804F-4110-B463-7BEA25DA680C", "car-2", 54, "sensor_2", 45.122, 1.75, 1591970346),
("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "car-3", 60, "sensor_1", 45.1241, 1.177, 1591970347),
("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "car-1", 55, "sensor_2", 45.122, 1.75, 1591970348),
("A7F19576-6EFA-40A0-91C3-D8999A41F453", "car-1", 56, "sensor_1", 45.1241, 1.177, 1591970348),
("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "car-2", 58, "sensor_3", 45.123, 1.176, 1591970349),
("8401CD82-ABFF-4D1B-A564-986DC971F913", "car-2", 60, "sensor_1", 45.1241, 1.177, 1591970349),
("F1CF77A0-6CFB-472F-B368-28F011F46C64", "car-1", 53, "sensor_3", 45.123, 1.176, 1591970349);

Simple projections

In this section, we are only interested in the speed of single cars, and we do not care about all the other fields.

We want to use the car id, which now is part of the record Value, to become the new key (using the special as _key syntax). We also want the car speed as the new record Value.

To achieve that we can create a new SQL Processor using some simple projections.

SET defaults.topic.autocreate=true;

INSERT INTO only_car_speeds
SELECT STREAM 
    car_id AS _key
    , speedMph
FROM car_speed_events;

Checking the records emitted by the processor we see that the shape of the records is

KEY: "car-1"
VALUE: { "speedMph": 50 }

We want to avoid that intermediate wrapping of speedMph inside an object. To do that we can tell Lenses to unwrap the value with the special as _value syntax, saving some bytes and CPU cycles:

SET defaults.topic.autocreate=true;

INSERT INTO only_car_speeds_unwrapped
SELECT STREAM 
    car_id AS _key
    , speedMph AS _value
FROM car_speed_events;

Now the shape of the records is what we had in mind:

KEY: "car-1"
VALUE: 50

Using built-in functions

This time we want to do some more complex manipulation. We want to convert the speed from Mph to Kmph, and we would also want to build a nice string describing the event.

An example of an output record would be:

{
    "speedKmph": 160.93,
    "description": "Car car_1 was running at 100Mph "
}

In this case, we are using CONCATENATE to concatenate multiple strings, CAST to convert an expression to another type, and *, the usual multiplication operator.

SET defaults.topic.autocreate=true;

INSERT INTO car_speeds_kmph
SELECT STREAM
    speedMph * 1.60934 AS speedKmph
    , CONCATENATE(
        'Car ',
        car_id,
        ' was running at ',
        CAST(speedMph AS STRING),
        'Mph') AS description
FROM car_speed_events;

If we check the resulting records, we can see that we obtained the shape we were looking for. Please note that the keys have been left untouched:

KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
VALUE: {
    speedKmph:"80.467",
    description:"Car car-1 was running at 50Mph"
}

Composite objects

In this last example, we will show how to create composite keys and values in our projections.

We want both the sensor id and the event_time as the record Key. For the record Value, we want the car_id and the speed, expressed both as Mph and Kmph.

KEY: {
    "sensor_id": "sensor_1",
    "event_time": 1591970345
}
VALUE: {
    "car_id": "car_1",
    "speed": {
        "mph": 100,
        "kmph": 161
    } 
}

Lenses SQL allows as to use nested aliases to build nested structures. You have to put some dots in your aliases.

SET defaults.topic.autocreate=true;

INSERT INTO car_speds_by_sensor_and_time
SELECT STREAM
    sensor.id AS _key.sensor_id
    , event_time AS _key.event_time
    , car_id
    , speedMph AS speed.mph
    , speedMph * 1.60934 AS speed.kmph
FROM car_speed_events;

The resulting shape of the record is what we were aiming for:

KEY: {
    "sensor_id": "sensor_1",
    "event_time": 1591970345
}
VALUE: {
    "car_id": "car_1",
    "speed": {
        "mph": 100,
        "kmph": 160
    } 
}

Happy re-shaping!

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.