This page describes how to use Profobuf with Lenses.
Lenses 5.0 or higher.
Confluent Platform / Schema Registry 5.5.0 or higher.
In order for Lenses to be able to work with PROTOBUF data, you will have to use a schema-registry-aware Kafka producer that publishes data encoded according to the Confluent wire format. Such a format allows Lenses, as well as any other data consumer, to resolve the correct schema from the registry before decoding the data.
If you are working with the JVM, we recommend using kafka-protobuf-serializer in conjunction with something like protoc-jar-maven-plugin or any build-time solution to generate classes from a protobuf schema.
Assuming your build tool is configured to compile one or several .proto files into Java classes, you should be able to produce data with code like the following:
Notice that, in the snippet above, the CreditCard
is the generated java class for the following schema:
If you get the source code, run the following command in the folder containing the pom.xml
file:
Shortly after the data is persisted into the selected Kafka topic, Lenses will automatically detect its Key/Value formats as STRING/PROTOBUF. From now on, the records just published should be viewable from the topic screen as well as queriable from the SQL Studio section. Please refer to our reference for directions on how to harness your data programmatically using Lenses SQL Studio and Lenses Apps. Also, head to our data publishing tutorial if you are looking for a quick and easy way to publish a few JSON-encoded records directly from the Lenses topic screen.
Lenses should handle correctly non-trivial schemas expressed either in version 2 or version 3 of the Protobuf syntax. However, it does not support yet a few schemas encodings expressible in Protobuf.
Most notably, supported schema encodings currently include:
Recursive message structs like google.protobuf.Value.
Well-known types supported are:
google.protobuf.Any
google.protobuf.BoolValue
google.protobuf.BytesValue
google.protobuf.CalendarPeriod
google.protobuf.Color
google.protobuf.DayOfWeek
google.protobuf.Decimal
google.protobuf.DoubleValue
google.protobuf.Duration
google.protobuf.Expr
google.protobuf.FieldMask
google.protobuf.FloatValue
google.protobuf.Fraction
google.protobuf.Int32Value
google.protobuf.Int64Value
google.protobuf.Interval
google.protobuf.LatLng
google.protobuf.LocalizedText
google.protobuf.Month
google.protobuf.PhoneNumber
google.protobuf.PostalAddress
google.protobuf.Quaternion
google.protobuf.StringValue
google.protobuf.Timestamp
google.protobuf.UInt64Value
google.type.Date
But, for now, not:
google.protobuf.Struct
google.protobuf.ListValue
google.protobuf.Empty
google.protobuf.NullValue
google.protobuf.Value
non-string map keys (i.e. currently, keys are always parsed as strings).
This page describes the support data formats for the Lenses SQL engines.
The input and output data for the engines are stored in Kafka. A storage format is the way to define how the information is stored.
Data sent and received from the brokers is a sequence of bytes. Therefore, data which is being sent to Kafka is first transformed into raw bytes. When reading the data, it is paramount to know how it was written to be able to recreate the original information.
There are many ways to store data:
JSON
AVRO
Google Protobuf
TEXT
CSV
XML
or custom format.
Choosing an appropriate format can have some great benefits:
faster read times
faster write times
schema evolution support
Storage formats can be classified as primitives and complex.
The list of supported formats is as follows:
Primitives and storage types are linked together. For bytes, integers, long or string(text) the storage format and the type representation are directly coupled. For example, an integer is stored as 8 bytes, the same way it is stored at runtime. A string is stored as a sequence of bytes using the platform default charset (UTF-8). Complex types, however, can store raw primitives as well.
Encoding composite data structures requires a different approach to storage format.
AVRO is a compact, fast, binary storage format.
The format relies on schemas. When AVRO data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.
Here is an example of a schema example:
Best practices for working with AVRO require to use of a schema manager. The reason is to avoid embedding the schema with each message since the schema payload might be bigger than the actual data itself. The schema identifier will be part of the data sent to a Kafka topic, thus reducing the disk space used and network traffic. Also having a schema manager allows a centralized place for applications to share, use, and evolve schemas.
In a scenario where the raw bytes contain the schema, Lenses can be extended with custom code to allow the engines to process the data. See below Custom Format.
Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use specially generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
Like AVRO the Protobuf format relies on schema. When the data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.
Here is an example of a schema example:
AVRO best practices apply for Google Protobuf.
The format is easy for humans to read and write, and since it is a text format it borrows the storage from the string primitive.
Given this SQL statement:
it produces this text output:
JSON storage format can write primitives directly as well. Considering a record Key of type Long and value 100 it will store it as the bytes representation for the text 100
.
This is another human-readable text format. Following the query above the output would be:
It is possible to extend Lenses to work with a different storage format which is not provided out of the box.
This makes it possible to handle storage formats like Google Protobuf (without Schema Registry) or Thrift. This extension has to be able to read and translate the raw record bytes into a meaningful data structure.
A byte array from a record which holds Google Protobuf data will be passed to the custom code. Using the Protobuf marshaller the custom extension will inflate the raw bytes to an object Lenses engines can work with..
For example, if a Kafka topic contains GPS coordinates. The data consists of a latitude and a longitude, both of which are doubles and stored as latitude:longitude
, such as 45.623412:10.419433
. Kafka will receive the raw bytes representation for the GPS coordinates text. Providing the code which can translate this back to an entity Lenses can work with, allows the engines to be able to process the records.
Lenses provides a Java API for hooking up custom storage into the engines.
The following Java code implements the interface Serde. Notice that the return type of the deserializer is a GenericRecord and that is populated with the two fields parsed from the bytes.
To be able to compile the code this library dependency is required to be added to the build system:
Once the code is built it can be deployed in Lenses. Follow the #deploy_custom_serde link to get the details.
CSV is a text storage format where the payload consists of a delimited sequence of values.
Lenses will lift the data into a structure like below:
All the items in the content
field are text. If any of the values represent numbers the CAST
function can be used to convert to the required type.
These are storage formats created by the Streaming mode. When performing windowed aggregations the resulting record key carries the window information with it. There are two storage formats for windowed aggregations:
Time Windowed
Session Windowed
They wrap the Key used during the aggregation. The Key storage format can be any of the other ones mentioned above, apart from custom ones.
For each windowed key the following schema applies:
Format | Read | Write | Additional |
---|
INT | yes | yes |
BYTES | yes | yes |
LONG | yes | yes |
STRING | yes | yes |
JSON | yes | yes |
AVRO | yes | yes | Supported via a Schema Registry. For embedded Avro stored records, a custom Lenses plugin is required. |
XML | yes | no |
CSV | yes | no |
Google Protobuf | yes | yes | Supported via a Schema Registry. |
Custom | yes | no | Supported via custom code |
TW[<the_other_formats>] | yes | yes | This is used by Streaming mode when using hopping or tumbling windows |
SW[<the_other_formats>] | yes | yes | This is used by Streaming mode when using session windowing |