Changing data formats

Tutorial on how to change the format of data in a Kafka topic from JSON to AVRO with Lenses SQL Processors.

In this example, we will show how to create an AVRO topic from an existing JSON topic.

Requirements

For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:

  • through direct user action where the schema is manually set

  • through inference; Lenses will try to infer the schema of a topic by looking at the topic data

  • and lastly, if the topic is created through Lenses, the schema will be automatically set

Creating the JSON data

With the following SQL we can create our intial JSON topic:

CREATE TABLE car_speed_events_json(
    _key.plate string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (json, json);

to which we can add data using:

INSERT INTO car_speed_events_json (
    _key.plate
    , speedMph
    , sensor.id 
    , sensor.lat
    , sensor.long 
    , event_time 
) VALUES
("20-GH-38", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("20-VL-12", 30, "sensor_1", 45.1241, 1.177, 1591970345),
("20-JH-98", 90, "sensor_1", 45.1241, 1.177, 1591970345);

It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json in our explore view:

Creating the AVRO topic

To create a new topic with format AVRO, we can create a processor that will copy the data from our original topic to a new topic changing the format in the process.

To do this, we start by going to “SQL Processor”, clicking: “New SQL Processor” and defining our processor with the following code:

SET defaults.topic.autocreate=true;

INSERT INTO car_speed_events_avro
STORE 
    KEY AS AVRO 
    VALUE AS AVRO
SELECT STREAM *
FROM car_speed_events_json;

Notice the addition of STORE KEY AS AVRO VALULE AS AVRO. This statement will tell our processor which format we want each facet (key or value) to be stored as.

Hitting “Create New Processor” will start a new processor

We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro into car_speed_events_avro.

We can also verify that the topic format of our new topic is also AVRO for both the key and value facets:

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.