Change data format


In this example, we will show how to create an AVRO topic from an existing JSON topic.

Requirements 

For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:

  • through direct user action where the schema is manually set
  • through inference; Lenses will try to infer the schema of a topic by looking at the topic data
  • and lastly, if the topic is created through Lenses, the schema will be automatically set

Creating the JSON data 

With the following SQL we can create our intial JSON topic:

CREATE TABLE car_speed_events_json(
    _key.plate string
    , speedMph int
    , sensor.id string
    , sensor.lat double
    , sensor.long double
    , event_time long
)
FORMAT (json, json);
Topology of Streaming SQL and joining 3 Kafka topics

to which we can add data using:

INSERT INTO car_speed_events_json (
    _key.plate
    , speedMph
    , sensor.id 
    , sensor.lat
    , sensor.long 
    , event_time 
) VALUES
("20-GH-38", 50, "sensor_1", 45.1241, 1.177, 1591970345),
("20-VL-12", 30, "sensor_1", 45.1241, 1.177, 1591970345),
("20-JH-98", 90, "sensor_1", 45.1241, 1.177, 1591970345);
Topology of Streaming SQL and joining 3 Kafka topics

It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json in our explore view:

Topology of Streaming SQL and joining 3 Kafka topics

Creating the AVRO topic 

To create a new topic with format AVRO, we can create a processor that will copy the data from our original topic to a new topic changing the format in the process.

To do this, we start by going to “SQL Processor”, clicking: “New SQL Processor” and defining our processor with the following code:

SET defaults.topic.autocreate=true;

INSERT INTO car_speed_events_avro
STORE 
    KEY AS AVRO 
    VALUE AS AVRO
SELECT STREAM *
FROM car_speed_events_json;

Notice the addition of STORE KEY AS AVRO VALULE AS AVRO. This statement will tell our processor which format we want each facet (key or value) to be stored as.

Topology of Streaming SQL and joining 3 Kafka topics

Hitting “Create New Processor” will start a new processor

We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro into car_speed_events_avro.

Topology of Streaming SQL and joining 3 Kafka topics

We can also verify that the topic format of our new topic is also AVRO for both the key and value facets:

Topology of Streaming SQL and joining 3 Kafka topics
--
Last modified: September 26, 2024