This page describes a tutorial to rekey data in a Kafka topic with Lenses SQL Processors.
Sometimes you have a topic that is almost exactly what you need, except that the key of the record requires a bit of massaging.
In Lenses SQL you can use the special SELECT ... as _key
syntax to quickly re-key your data.
In our example, we have a topic containing events coming from temperature sensors.
Each record contains the sensor’ measured temperature, the time of the measurement, and the id of the sensor. The key is a unique string (for example, a UUID) that the upstream system assigns to the event.
You can replicate the example, by creating a topic in SQL Studio:
We can also insert some example data to do our experiments:
You can explore the topic in lenses to check the content and the shape of what you just inserted.
Let’s say that what you need is that same stream of events, but the record key should be the sensor_id
instead of the UUID.
With the special SELECT ... as _key
syntax, a few lines are enough to define our new re-keying processor:
The query above will take the sensor_id
from the value of the record and put it as the new key. All the values fields will remain untouched:
Maybe the sensor_id
is not enough, and for some reason, you also need the hour of the measurement in the key. In this case, the key will become a composite object with two fields: the sensor_id
and the event_hour
:
As you can see, you can build composite objects in Lenses with ease just listing all the structure’s fields, one after the other.
In the last example, the _key
output storage format will be inferred automatically by the system as JSON
. If you need more control, you can use the STORE AS
clause before the SELECT
.
The following example will create a topic as the previous one, but where the keys will be stored as AVRO
:
Happy re-keying!
This page a tutorial to control AVRO record names and namespaces with Lenses SQL Processors.
When writing output as AVRO
, Lenses creates schemas for you, automatically generating AVRO
record names.
In this tutorial we will learn how to override the default record naming strategy.
In Lenses SQL you can use a SET
statement to control the record and namespace name generated for the AVRO schema.
We are going to create and populate a topic that we will later use in a couple of SQL Processors.
In SQL Studio, create the topic running the following query:
For the purposes of our tutorial, it is enough to insert a single topic:
We are now going to create a processor that will show the default behavior of AVRO
record naming in Lenses.
The processor does not do much, it just reshapes the fields of the original topic, putting some of them in a nested field:
We then start the processor. Lenses will create the new topic mytopic_2
, and new schema will be created in the Schema Registry, as soon as the first (and only) record is processed.
If we inspect the value schema of mytopic_2
, we see that this is the one generated:
As we can see, each record
type has a name (it is mandatory in AVRO
), and Lenses has generated those names automatically for us (record
, record0
, record1
etc.).
We are now going to see how to override that default behavior.
Let’s create and start the new processor with the following SQL:
Notice how we added the new SET
statements to the query:
These settings are telling Lenses to set the root record name and namespace to the values specified.
If we now check the value schema for mytopic_3
we get:
As we can see, the root record
element has now name myRecordName
and namespace myNamespace
.
Notice how the settings did not affect nested records.
If the key of the generated topic has AVRO
format as well, you can use the following analogous settings to control the key record name and namespace:
A setting like the one we used before for the value schema:
will affect all the topics used by the processor.
If you want instead to target a single topic, you can use the topic-specific version:
The setting above will override the record name only for topic mytopic_3
. Other topics will not be affected and will keep using the default naming strategy.
This page describe a tutorial to change the shape (fields) of data in a Kafka topic using Lenses SQL Processors.
In this tutorial, we will see how to use Lenses SQL to alter the shape of your records.
In Lenses SQL you can quickly reshape your data with a simple SELECT
and some built-in functions.
We will learn how to
put value fields into the key
lift key fields into the value
call functions to transform your data
build nested structures for your keys/values
unwrap singleton objects into primitive types
In our example, we are getting data from speed sensors from a speed car circuit.
The upstream system registers speed measurement events as records in a Kafka topic.
An example of such an event is the following:
We can replicate such a structure running the following query in SQL Studio:
Each event is keyed by a unique string generated by the upstream system.
We can again use SQL Studio to insert some data to play with:
In this section, we are only interested in the speed of single cars, and we do not care about all the other fields.
We want to use the car id, which now is part of the record Value, to become the new key (using the special as _key
syntax). We also want the car speed as the new record Value.
To achieve that we can create a new SQL Processor using some simple projections.
Checking the records emitted by the processor we see that the shape of the records is
We want to avoid that intermediate wrapping of speedMph
inside an object. To do that we can tell Lenses to unwrap the value with the special as _value
syntax, saving some bytes and CPU cycles:
Now the shape of the records is what we had in mind:
This time we want to do some more complex manipulation. We want to convert the speed from Mph to Kmph, and we would also want to build a nice string describing the event.
An example of an output record would be:
In this case, we are using CONCATENATE
to concatenate multiple strings, CAST
to convert an expression to another type, and *
, the usual multiplication operator.
If we check the resulting records, we can see that we obtained the shape we were looking for. Please note that the keys have been left untouched:
In this last example, we will show how to create composite keys and values in our projections.
We want both the sensor id
and the event_time
as the record Key. For the record Value, we want the car_id
and the speed, expressed both as Mph and Kmph.
Lenses SQL allows as to use nested aliases to build nested structures. You have to put some dots in your aliases.
The resulting shape of the record is what we were aiming for:
Happy re-shaping!
Tutorial on how to change the format of data in a Kafka topic from JSON to AVRO with Lenses SQL Processors.
In this example, we will show how to create an AVRO topic from an existing JSON topic.
For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:
through direct user action where the schema is manually set
through inference; Lenses will try to infer the schema of a topic by looking at the topic data
and lastly, if the topic is created through Lenses, the schema will be automatically set
With the following SQL we can create our intial JSON topic:
to which we can add data using:
It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json
in our explore view:
To create a new topic with format AVRO, we can create a processor that will copy the data from our original topic to a new topic changing the format in the process.
To do this, we start by going to “SQL Processor”, clicking: “New SQL Processor” and defining our processor with the following code:
Notice the addition of STORE KEY AS AVRO VALULE AS AVRO
. This statement will tell our processor which format we want each facet (key or value) to be stored as.
Hitting “Create New Processor” will start a new processor
We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro
into car_speed_events_avro
.
We can also verify that the topic format of our new topic is also AVRO for both the key and value facets: