All pages
Powered by GitBook
1 of 5

Loading...

Loading...

Loading...

Loading...

Loading...

Data formats

Controlling AVRO record names and namespaces

This page a tutorial to control AVRO record names and namespaces with Lenses SQL Processors.

When writing output as AVRO, Lenses creates schemas for you, automatically generating AVRO record names.

In this tutorial we will learn how to override the default record naming strategy.

In Lenses SQL you can use a SET statement to control the record and namespace name generated for the AVRO schema.

Setting up our input topic

We are going to create and populate a topic that we will later use in a couple of SQL Processors.

In SQL Studio, create the topic running the following query:

For the purposes of our tutorial, it is enough to insert a single topic:

Create a simple SQL Processor

We are now going to create a processor that will show the default behavior of AVRO record naming in Lenses.

The processor does not do much, it just reshapes the fields of the original topic, putting some of them in a nested field:

We then start the processor. Lenses will create the new topic mytopic_2, and new schema will be created in the Schema Registry, as soon as the first (and only) record is processed.

If we inspect the value schema of mytopic_2, we see that this is the one generated:

As we can see, each record type has a name (it is mandatory in AVRO), and Lenses has generated those names automatically for us (record, record0, record1 etc.).

Set the record name and the namespace of the value schema

We are now going to see how to override that default behavior.

Let’s create and start the new processor with the following SQL:

Notice how we added the new SET statements to the query:

These settings are telling Lenses to set the root record name and namespace to the values specified.

If we now check the value schema for mytopic_3 we get:

As we can see, the root record element has now name myRecordName and namespace myNamespace.

Notice how the settings did not affect nested records.

Set the record name and the namespace of the key schema

If the key of the generated topic has AVRO format as well, you can use the following analogous settings to control the key record name and namespace:

More control on the topic affected by the setting

A setting like the one we used before for the value schema:

will affect all the topics used by the processor.

If you want instead to target a single topic, you can use the topic-specific version:

The setting above will override the record name only for topic mytopic_3. Other topics will not be affected and will keep using the default naming strategy.

CREATE TABLE mytopic(a string, b string, c string) FORMAT (STRING, AVRO);
INSERT INTO mytopic(a, b, c) VALUES ("a", "b", "c");
SET defaults.topic.autocreate=true;

INSERT INTO mytopic_2
SELECT STREAM a as x.a, b as x.y.b, c as x.y.c
FROM mytopic
{
  "type": "record",
  "name": "record1",
  "fields": [
    {
      "name": "x",
      "type": {
        "type": "record",
        "name": "record0",
        "fields": [
          {
            "name": "a",
            "type": "string"
          },
          {
            "name": "y",
            "type": {
              "type": "record",
              "name": "record",
              "fields": [
                {
                  "name": "b",
                  "type": "string"
                },
                {
                  "name": "c",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    }
  ]
}
SET defaults.topic.autocreate=true;
SET defaults.topic.value.avro.record="myRecordName";
SET defaults.topic.value.avro.namespace="myNamespace";

INSERT INTO mytopic_3
SELECT STREAM a as x.a, b as x.y.b, c as x.y.c
FROM mytopic
SET defaults.topic.value.avro.record="myRecordName";
SET defaults.topic.value.avro.namespace="myNamespace";
{
  "type": "record",
  "name": "myRecordName",
  "namespace": "myNamespace",
  "fields": [
    {
      "name": "x",
      "type": {
        "type": "record",
        "name": "record0",
        "fields": [
          {
            "name": "a",
            "type": "string"
          },
          {
            "name": "y",
            "type": {
              "type": "record",
              "name": "record",
              "fields": [
                {
                  "name": "b",
                  "type": "string"
                },
                {
                  "name": "c",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    }
  ]
}
SET defaults.topic.key.avro.record="myRecordName";
SET defaults.topic.key.avro.namespace="myNamespace";
SET defaults.topic.value.avro.record="myRecordName"
SET topic.mytopic_3.value.avro.record="myRecordName"

Rekeying data

This page describes a tutorial to rekey data in a Kafka topic with Lenses SQL Processors.

Sometimes you have a topic that is almost exactly what you need, except that the key of the record requires a bit of massaging.

In Lenses SQL you can use the special SELECT ... as _key syntax to quickly re-key your data.

In our example, we have a topic containing events coming from temperature sensors.

Each record contains the sensor’ measured temperature, the time of the measurement, and the id of the sensor. The key is a unique string (for example, a UUID) that the upstream system assigns to the event.

You can replicate the example, by creating a topic in SQL Studio:

CREATE TABLE temperature_events(
    sensor_id string
    , temperature int
    , event_time long
)
FORMAT (string, avro);

We can also insert some example data to do our experiments:

You can explore the topic in lenses to check the content and the shape of what you just inserted.

Let’s say that what you need is that same stream of events, but the record key should be the sensor_id instead of the UUID.

With the special SELECT ... as _key syntax, a few lines are enough to define our new re-keying processor:

The query above will take the sensor_id from the value of the record and put it as the new key. All the values fields will remain untouched:

Maybe the sensor_id is not enough, and for some reason, you also need the hour of the measurement in the key. In this case, the key will become a composite object with two fields: the sensor_id and the event_hour:

As you can see, you can build composite objects in Lenses with ease just listing all the structure’s fields, one after the other.

In the last example, the _key output storage format will be inferred automatically by the system as JSON. If you need more control, you can use the STORE AS clause before the SELECT.

The following example will create a topic as the previous one, but where the keys will be stored as AVRO:

Happy re-keying!

Changing data formats

Tutorial on how to change the format of data in a Kafka topic from JSON to AVRO with Lenses SQL Processors.

In this example, we will show how to create an AVRO topic from an existing JSON topic.

Requirements

For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:

  • through direct user action where the schema is manually set

INSERT INTO temperature_events(
    _key
    , sensor_id
    , temperature
    , event_time
) VALUES
("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "sensor-1", 12, 1591970345),
("0958C1E2-804F-4110-B463-7BEA25DA680C", "sensor-2", 13, 1591970346),
("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "sensor-3", 28, 1591970347),
("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "sensor-1", 13, 1591970348),
("A7F19576-6EFA-40A0-91C3-D8999A41F453", "sensor-1", 12, 1591970348),
("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "sensor-2", 31, 1591970349),
("8401CD82-ABFF-4D1B-A564-986DC971F913", "sensor-2", 30, 1591970349),
("F1CF77A0-6CFB-472F-B368-28F011F46C64", "sensor-1", 13, 1591970349);

through inference; Lenses will try to infer the schema of a topic by looking at the topic data

  • and lastly, if the topic is created through Lenses, the schema will be automatically set

  • Creating the JSON data

    With the following SQL we can create our intial JSON topic:

    to which we can add data using:

    It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json in our explore view:

    Creating the AVRO topic

    To create a new topic with format AVRO, we can create a processor that will copy the data from our original topic to a new topic changing the format in the process.

    To do this, we start by going to “SQL Processor”, clicking: “New SQL Processor” and defining our processor with the following code:

    Notice the addition of STORE KEY AS AVRO VALULE AS AVRO. This statement will tell our processor which format we want each facet (key or value) to be stored as.

    Hitting “Create New Processor” will start a new processor

    We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro into car_speed_events_avro.

    We can also verify that the topic format of our new topic is also AVRO for both the key and value facets:

    SET defaults.topic.autocreate=true;
    
    INSERT INTO events_by_sensor
    SELECT STREAM sensor_id as _key
    FROM temperature_events;
    SET defaults.topic.autocreate=true;
    
    INSERT INTO events_by_sensor_and_hour
    SELECT STREAM
        temperature_events.sensor_id as _key.sensor_id
        , temperature_events.event_time / 3600 as _key.event_hour
    FROM temperature_events;
    SET defaults.topic.autocreate=true;
    
    INSERT INTO events_by_sensor_and_hour_avro_key
    STORE KEY AS AVRO
    SELECT STREAM
        temperature_events.sensor_id as _key.sensor_id
        , FLOOR(temperature_events.event_time / 3600) as _key.event_hour
    FROM temperature_events;
    CREATE TABLE car_speed_events_json(
        _key.plate string
        , speedMph int
        , sensor.id string
        , sensor.lat double
        , sensor.long double
        , event_time long
    )
    FORMAT (json, json);
    INSERT INTO car_speed_events_json (
        _key.plate
        , speedMph
        , sensor.id 
        , sensor.lat
        , sensor.long 
        , event_time 
    ) VALUES
    ("20-GH-38", 50, "sensor_1", 45.1241, 1.177, 1591970345),
    ("20-VL-12", 30, "sensor_1", 45.1241, 1.177, 1591970345),
    ("20-JH-98", 90, "sensor_1", 45.1241, 1.177, 1591970345);
    SET defaults.topic.autocreate=true;
    
    INSERT INTO car_speed_events_avro
    STORE 
        KEY AS AVRO 
        VALUE AS AVRO
    SELECT STREAM *
    FROM car_speed_events_json;

    Changing the shape of data

    This page describe a tutorial to change the shape (fields) of data in a Kafka topic using Lenses SQL Processors.

    In this tutorial, we will see how to use Lenses SQL to alter the shape of your records.

    In Lenses SQL you can quickly reshape your data with a simple SELECT and some built-in functions.

    We will learn how to

    • put value fields into the key

    • lift key fields into the value

    • call functions to transform your data

    • build nested structures for your keys/values

    • unwrap singleton objects into primitive types

    Setting up our example

    In our example, we are getting data from speed sensors from a speed car circuit.

    The upstream system registers speed measurement events as records in a Kafka topic.

    An example of such an event is the following:

    We can replicate such a structure running the following query in SQL Studio:

    Each event is keyed by a unique string generated by the upstream system.

    We can again use SQL Studio to insert some data to play with:

    Simple projections

    In this section, we are only interested in the speed of single cars, and we do not care about all the other fields.

    We want to use the car id, which now is part of the record Value, to become the new key (using the special as _key syntax). We also want the car speed as the new record Value.

    To achieve that we can create a new SQL Processor using some simple projections.

    Checking the records emitted by the processor we see that the shape of the records is

    We want to avoid that intermediate wrapping of speedMph inside an object. To do that we can tell Lenses to unwrap the value with the special as _value syntax, saving some bytes and CPU cycles:

    Now the shape of the records is what we had in mind:

    Using built-in functions

    This time we want to do some more complex manipulation. We want to convert the speed from Mph to Kmph, and we would also want to build a nice string describing the event.

    An example of an output record would be:

    In this case, we are using CONCATENATE to concatenate multiple strings, CAST to convert an expression to another type, and *, the usual multiplication operator.

    If we check the resulting records, we can see that we obtained the shape we were looking for. Please note that the keys have been left untouched:

    Composite objects

    In this last example, we will show how to create composite keys and values in our projections.

    We want both the sensor id and the event_time as the record Key. For the record Value, we want the car_id and the speed, expressed both as Mph and Kmph.

    Lenses SQL allows as to use nested aliases to build nested structures. You have to put some dots in your aliases.

    The resulting shape of the record is what we were aiming for:

    Happy re-shaping!

    KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
    VALUE: {
      "car_id": "car_2",
      "speedMph": 84,
      "sensor": {
        "id": "sensor_1",
        "lat": 45,
        "long": 0
      },
      "event_time": 159230614
    }
    CREATE TABLE car_speed_events(
        car_id string
        , speedMph int
        , sensor.id string
        , sensor.lat double
        , sensor.long double
        , event_time long
    )
    FORMAT (string, avro);
    INSERT INTO car_speed_events(
        _key
        , car_id
        , speedMph
        , sensor.id
        , sensor.lat
        , sensor.long
        , event_time
    ) VALUES
    ("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "car-1", 50, "sensor_1", 45.1241, 1.177, 1591970345),
    ("0958C1E2-804F-4110-B463-7BEA25DA680C", "car-2", 54, "sensor_2", 45.122, 1.75, 1591970346),
    ("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "car-3", 60, "sensor_1", 45.1241, 1.177, 1591970347),
    ("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "car-1", 55, "sensor_2", 45.122, 1.75, 1591970348),
    ("A7F19576-6EFA-40A0-91C3-D8999A41F453", "car-1", 56, "sensor_1", 45.1241, 1.177, 1591970348),
    ("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "car-2", 58, "sensor_3", 45.123, 1.176, 1591970349),
    ("8401CD82-ABFF-4D1B-A564-986DC971F913", "car-2", 60, "sensor_1", 45.1241, 1.177, 1591970349),
    ("F1CF77A0-6CFB-472F-B368-28F011F46C64", "car-1", 53, "sensor_3", 45.123, 1.176, 1591970349);
    SET defaults.topic.autocreate=true;
    
    INSERT INTO only_car_speeds
    SELECT STREAM 
        car_id AS _key
        , speedMph
    FROM car_speed_events;
    KEY: "car-1"
    VALUE: { "speedMph": 50 }
    SET defaults.topic.autocreate=true;
    
    INSERT INTO only_car_speeds_unwrapped
    SELECT STREAM 
        car_id AS _key
        , speedMph AS _value
    FROM car_speed_events;
    KEY: "car-1"
    VALUE: 50
    {
        "speedKmph": 160.93,
        "description": "Car car_1 was running at 100Mph "
    }
    SET defaults.topic.autocreate=true;
    
    INSERT INTO car_speeds_kmph
    SELECT STREAM
        speedMph * 1.60934 AS speedKmph
        , CONCATENATE(
            'Car ',
            car_id,
            ' was running at ',
            CAST(speedMph AS STRING),
            'Mph') AS description
    FROM car_speed_events;
    KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C"
    VALUE: {
        speedKmph:"80.467",
        description:"Car car-1 was running at 50Mph"
    }
    KEY: {
        "sensor_id": "sensor_1",
        "event_time": 1591970345
    }
    VALUE: {
        "car_id": "car_1",
        "speed": {
            "mph": 100,
            "kmph": 161
        } 
    }
    SET defaults.topic.autocreate=true;
    
    INSERT INTO car_speds_by_sensor_and_time
    SELECT STREAM
        sensor.id AS _key.sensor_id
        , event_time AS _key.event_time
        , car_id
        , speedMph AS speed.mph
        , speedMph * 1.60934 AS speed.kmph
    FROM car_speed_events;
    KEY: {
        "sensor_id": "sensor_1",
        "event_time": 1591970345
    }
    VALUE: {
        "car_id": "car_1",
        "speed": {
            "mph": 100,
            "kmph": 160
        } 
    }