Supported data formats
The input and output data for the engines are stored in Kafka. A storage format is the way to define how the information is stored.
Data sent and received from the brokers is a sequence of bytes. Therefore, data which is being sent to Kafka is first transformed into raw bytes. When reading the data, it is paramount to know how it was written to be able to recreate the original information.
There are many ways to store data:
JSON
AVRO
Google Protobuf
TEXT
CSV
XML
or custom format.
Choosing an appropriate format can have some great benefits:
faster read times
faster write times
schema evolution support
Supported formats
Storage formats can be classified as primitives and complex.
The list of supported formats is as follows:
Format | Read | Write | Additional |
---|---|---|---|
INT | yes | yes | |
BYTES | yes | yes | |
LONG | yes | yes | |
STRING | yes | yes | |
JSON | yes | yes | |
AVRO | yes | yes | Supported via a Schema Registry. For embedded Avro stored records, a custom Lenses plugin is required. |
XML | yes | no | |
CSV | yes | no | |
Google Protobuf | yes | yes | Supported via a Schema Registry. |
Custom | yes | no | Supported via custom code |
TW[<the_other_formats>] | yes | yes | This is used by Streaming mode when using hopping or tumbling windows |
SW[<the_other_formats>] | yes | yes | This is used by Streaming mode when using session windowing |
Primitives
Primitives and storage types are linked together. For bytes, integers, long or string(text) the storage format and the type representation are directly coupled. For example, an integer is stored as 8 bytes, the same way it is stored at runtime. A string is stored as a sequence of bytes using the platform default charset (UTF-8). Complex types, however, can store raw primitives as well.
Complex formats
Encoding composite data structures requires a different approach to storage format.
AVRO
AVRO is a compact, fast, binary storage format.
The format relies on schemas. When AVRO data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.
Here is an example of a schema example:
Best practices for working with AVRO require to use of a schema manager. The reason is to avoid embedding the schema with each message since the schema payload might be bigger than the actual data itself. The schema identifier will be part of the data sent to a Kafka topic, thus reducing the disk space used and network traffic. Also having a schema manager allows a centralized place for applications to share, use, and evolve schemas.
In a scenario where the raw bytes contain the schema, Lenses can be extended with custom code to allow the engines to process the data. See below Custom Format.
Google Protobuf
Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use specially generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
Like AVRO the Protobuf format relies on schema. When the data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.
Here is an example of a schema example:
AVRO best practices apply for Google Protobuf.
JSON
The format is easy for humans to read and write, and since it is a text format it borrows the storage from the string primitive.
Given this SQL statement:
it produces this text output:
JSON storage format can write primitives directly as well. Considering a record Key of type Long and value 100 it will store it as the bytes representation for the text 100
.
XML
This is another human-readable text format. Following the query above the output would be:
It is possible to extend Lenses to work with a different storage format which is not provided out of the box.
This makes it possible to handle storage formats like Google Protobuf (without Schema Registry) or Thrift. This extension has to be able to read and translate the raw record bytes into a meaningful data structure.
A byte array from a record which holds Google Protobuf data will be passed to the custom code. Using the Protobuf marshaller the custom extension will inflate the raw bytes to an object Lenses engines can work with..
For example, if a Kafka topic contains GPS coordinates. The data consists of a latitude and a longitude, both of which are doubles and stored as latitude:longitude
, such as 45.623412:10.419433
. Kafka will receive the raw bytes representation for the GPS coordinates text. Providing the code which can translate this back to an entity Lenses can work with, allows the engines to be able to process the records.
Lenses provides a Java API for hooking up custom storage into the engines.
The following Java code implements the interface Serde. Notice that the return type of the deserializer is a GenericRecord and that is populated with the two fields parsed from the bytes.
To be able to compile the code this library dependency is required to be added to the build system:
Once the code is built it can be deployed in Lenses. Follow the #deploy_custom_serde link to get the details.
CSV
CSV is a text storage format where the payload consists of a delimited sequence of values.
Lenses will lift the data into a structure like below:
All the items in the content
field are text. If any of the values represent numbers the CAST
function can be used to convert to the required type.
Windowed formats
These are storage formats created by the Streaming mode. When performing windowed aggregations the resulting record key carries the window information with it. There are two storage formats for windowed aggregations:
Time Windowed
Session Windowed
They wrap the Key used during the aggregation. The Key storage format can be any of the other ones mentioned above, apart from custom ones.
For each windowed key the following schema applies:
Last updated