Supported data formats

The input and output data for the engines are stored in Kafka. A storage format is the way to define how the information is stored.

Data sent and received from the brokers is a sequence of bytes. Therefore, data which is being sent to Kafka is first transformed into raw bytes. When reading the data, it is paramount to know how it was written to be able to recreate the original information.

There are many ways to store data:

  • JSON

  • AVRO

  • Google Protobuf

  • TEXT

  • CSV

  • XML

  • or custom format.

Choosing an appropriate format can have some great benefits:

  • faster read times

  • faster write times

  • schema evolution support

Supported formats

Storage formats can be classified as primitives and complex.

The list of supported formats is as follows:

Format
Read
Write
Additional

INT

yes

yes

BYTES

yes

yes

LONG

yes

yes

STRING

yes

yes

JSON

yes

yes

AVRO

yes

yes

Supported via a Schema Registry. For embedded Avro stored records, a custom Lenses plugin is required.

XML

yes

no

CSV

yes

no

Google Protobuf

yes

yes

Supported via a Schema Registry.

Custom

yes

no

Supported via custom code

TW[<the_other_formats>]

yes

yes

This is used by Streaming mode when using hopping or tumbling windows

SW[<the_other_formats>]

yes

yes

This is used by Streaming mode when using session windowing

Primitives

Primitives and storage types are linked together. For bytes, integers, long or string(text) the storage format and the type representation are directly coupled. For example, an integer is stored as 8 bytes, the same way it is stored at runtime. A string is stored as a sequence of bytes using the platform default charset (UTF-8). Complex types, however, can store raw primitives as well.

Complex formats

Encoding composite data structures requires a different approach to storage format.

AVRO

AVRO is a compact, fast, binary storage format.

The format relies on schemas. When AVRO data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.

Here is an example of a schema example:

{
  "type": "record",
  "name": "LensesExample",
  "namespace": "io.lenses.example",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "username",
      "type": "string"
    },
    {
      "name": "followers",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "settingsMap",
      "type": {
        "type": "map",
        "values": "string"
      }
    },
    {
      "name": "relationship",
      "type": {
        "type": "enum",
        "name": "Relationship",
        "symbols": [
          "MARRIED",
          "FRIEND",
          "COLLEAGUE",
          "STRANGER"
        ]
      }
    },
    {
      "name": "address",
      "type": {
        "type": "record",
        "name": "Address",
        "fields": [
          {
            "name": "number",
            "type": "int"
          },
          {
            "name": "street",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Best practices for working with AVRO require to use of a schema manager. The reason is to avoid embedding the schema with each message since the schema payload might be bigger than the actual data itself. The schema identifier will be part of the data sent to a Kafka topic, thus reducing the disk space used and network traffic. Also having a schema manager allows a centralized place for applications to share, use, and evolve schemas.

In a scenario where the raw bytes contain the schema, Lenses can be extended with custom code to allow the engines to process the data. See below Custom Format.

Google Protobuf

Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use specially generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

Like AVRO the Protobuf format relies on schema. When the data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.

Here is an example of a schema example:

message Person {
  string name = 1;
  int32 id = 2;
  string email = 3;
}

AVRO best practices apply for Google Protobuf.

JSON

The format is easy for humans to read and write, and since it is a text format it borrows the storage from the string primitive.

Given this SQL statement:

INSERT INTO <target_topic>
SELECT STREAM 
    country
  , COUNT(*) as total
  , MAXK(points,3) as maxpoints
  , AVG(points) as avgpoints
FROM <source_topic>

it produces this text output:

{
    "country":"UK",
    "total": 1235,
    "maxpoints" : [ 9900, 8291, 8111],
    "avgpoints" : 212345.65
}

JSON storage format can write primitives directly as well. Considering a record Key of type Long and value 100 it will store it as the bytes representation for the text 100.

XML

This is another human-readable text format. Following the query above the output would be:

<root>
    <country>UK</country>
    <total>1235</total>
    <maxpoints>
        <item>9900</item>
        <item>8291</item>
        <item>8111</item>
    </maxpoints>
    <avgpoints>212345.65</avgpoints>
</root>

It is possible to extend Lenses to work with a different storage format which is not provided out of the box.

This makes it possible to handle storage formats like Google Protobuf (without Schema Registry) or Thrift. This extension has to be able to read and translate the raw record bytes into a meaningful data structure.

A byte array from a record which holds Google Protobuf data will be passed to the custom code. Using the Protobuf marshaller the custom extension will inflate the raw bytes to an object Lenses engines can work with..

For example, if a Kafka topic contains GPS coordinates. The data consists of a latitude and a longitude, both of which are doubles and stored as latitude:longitude, such as 45.623412:10.419433. Kafka will receive the raw bytes representation for the GPS coordinates text. Providing the code which can translate this back to an entity Lenses can work with, allows the engines to be able to process the records.

Lenses provides a Java API for hooking up custom storage into the engines.

The following Java code implements the interface Serde. Notice that the return type of the deserializer is a GenericRecord and that is populated with the two fields parsed from the bytes.

public class LatLngSerde implements Serde {
    private Schema schema = SchemaBuilder.builder()
            .record("lat_lng")
            .fields()
            .requiredDouble("lat")
            .requiredDouble("lng")
            .endRecord();

    @Override
    public Schema getSchema() {
        return schema;
    }

    @Override
    public Serializer serializer(Properties properties) {
        return new Serializer() {

            @Override
            public byte[] serialize(GenericRecord record) throws IOException {
                double lat = (double) record.get("lat");
                double lng = (double) record.get("lng");
                String data = lat + ":" + lng;
                return data.getBytes("UTF-8");
            }

            @Override
            public void close() throws IOException {
            }
        };
    }

    @Override
    public Deserializer deserializer(Properties properties) {
        throw new NotImplementedException("Not required for now.");
    }
}

To be able to compile the code this library dependency is required to be added to the build system:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-serde</artifactId>
    <version>1.0.2</version>
</dependency>

Once the code is built it can be deployed in Lenses. Follow the #deploy_custom_serde link to get the details.

CSV

CSV is a text storage format where the payload consists of a delimited sequence of values.

value1,value2,...,valueN 

Lenses will lift the data into a structure like below:

{
    "content":[
        //value1,
        //value2,
        //..
        //valueN
    ]
}

All the items in the content field are text. If any of the values represent numbers the CAST function can be used to convert to the required type.

Windowed formats

These are storage formats created by the Streaming mode. When performing windowed aggregations the resulting record key carries the window information with it. There are two storage formats for windowed aggregations:

  • Time Windowed

  • Session Windowed

They wrap the Key used during the aggregation. The Key storage format can be any of the other ones mentioned above, apart from custom ones.

For each windowed key the following schema applies:

{
  "value": //The Aggregation Key schema. It can be a primitive or complext type
  "window": {
    "start": //epoch time when the window started,
    "end": //epoch time when the window ends. Only for session windows.
  }
}

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.