Storage formats

The input and output data for the engines is stored in Kafka. A storage format is the way to define how the information is stored.

Data sent and received from the brokers is a sequence of bytes. Therefore, data which is being sent to Kafka is first transformed to raw bytes. When reading the data, it is paramount to know how it was written to be able to recreate the original information.

There are many ways to store data:

  • JSON
  • XML
  • AVRO
  • TEXT
  • Google Protobuf
  • or custom format.

Choosing an appropriate format can have some great benefits:

  • faster read times
  • faster write times
  • schema evolution support

Supported formats 

Storage formats can be classified in primitives and complex.

The list of supported formats are:

FormatReadWriteAdditional
INTyesyes
BYTESyesyes
LONGyesyes
STRINGyesyes
JSONyesyes
AVROyesyesSupported via a Schema Registry. For embedded Avro stored records, a custom Lenses plugin is required.
XMLyesyes
CSVyesnot yet
Google ProtobufyesnoSupported via custom code
CustomyesnoSupported via custom code
TW[<the_other_formats>]yesyesThis is used by Streaming mode when using hopping or tumbling windows
SW[<the_other_formats>]yesyesThis is used by Streaming mode when using session windowing

Primitives 

Primitives and storage types are linked together. For bytes, integer, long or string(text) the storage format and the type representation are directly coupled. For example an integer is stored as 8 bytes, the same way it is stored at runtime. A string is stored as a sequence of bytes using the platform default charset (UTF-8). Complex types, however, can store raw primitives as well.

Complex formats 

Encoding composite data structures requires a different approach to storage format.

AVRO 

Avro it’s a compact, fast, binary storage format.

The format relies on schemas. When Avro data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.

Here is an example of a schema example:

{
  "type": "record",
  "name": "LensesExample",
  "namespace": "io.lenses.example",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "username",
      "type": "string"
    },
    {
      "name": "followers",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "settingsMap",
      "type": {
        "type": "map",
        "values": "string"
      }
    },
    {
      "name": "relationship",
      "type": {
        "type": "enum",
        "name": "Relationship",
        "symbols": [
          "MARRIED",
          "FRIEND",
          "COLLEAGUE",
          "STRANGER"
        ]
      }
    },
    {
      "name": "address",
      "type": {
        "type": "record",
        "name": "Address",
        "fields": [
          {
            "name": "number",
            "type": "int"
          },
          {
            "name": "street",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Best practices for working with Avro require is to use a schema manager. The reason is to avoid embedding the schema with each message, since the schema payload might be bigger than the actual data itself. The schema identifier will be part of the data sent to a Kafka topic, thus reducing the disk space used and network traffic. Also having a schema manager allows a centralized place for applications to share, use, and evolve schemas.

In a scenario where the raw bytes contain the schema, Lenses can be extended with custom code to allow the engines to process the data. See below Google Protobuf.

JSON (JavaScript Object Notation) 

The format it easy for humans to read and write, and since it is a text format it borrows the storage from the string primitive.

Given this SQL statement:

INSERT INTO <target_topic>
SELECT STREAM 
    country
  , COUNT(*) as total
  , MAXK(points,3) as maxpoints
  , AVG(points) as avgpoints
FROM <source_topic>

it produces this text output:

{
    "country":"UK",
    "total": 1235,
    "maxpoints" : [ 9900, 8291, 8111],
    "avgpoints" : 212345.65
}

JSON storage format can write primitives directly as well. Considering a record Key of type Long and value 100 it will store it as the bytes representation for the text 100.

XML (eXtensible Markup Language) 

This is another human readable text format. Following the query above the output would be:

<root>
    <country>UK</country>
    <total>1235</total>
    <maxpoints>
        <item>9900</item>
        <item>8291</item>
        <item>8111</item>
    </maxpoints>
    <avgpoints>212345.65</avgpoints>
</root>

Google Protobuf and custom format 

It is possible to extend Lenses to work with a different storage format which is not provided out of the box.

This makes it possible to handle storage formats like Google Protobuf or Thrift. This extension has to be able to read and translate the raw record bytes into a meaningful data structure.

A byte array from a record which holds Google Protobuf data will be passed to the custom code. Using the Protobuf marshaller the custom extension will inflate the raw bytes to an object Lenses engines can work with..

For example, if a Kafka topic contains GPS coordinates. The data consists of a latitude and a longitude, both of which are doubles and stored as latitude:longitude, such as 45.623412:10.419433. Kafka will receive the raw bytes representation for the GPS coordinates text. Providing the code which can translate this back to an entity Lenses can work with, allows the engines to be able to process the records.

Lenses provides a Java API for hooking up custom storage into the engines.

The following Java code implements the interface Serde. Notice that the return type of the deserializer is an GenericRecord and that is populated with the two fields parsed from the bytes.

public class LatLngSerde implements Serde {
    private Schema schema = SchemaBuilder.builder()
                        .record("lat_lng")
                        .fields()
                        .requiredDouble("lat")
                        .requiredDouble("lng")
                        .endRecord();

    @Override
    public Schema getSchema() {
        return schema;
    }

    @Override
    public Serializer serializer(Properties properties) {
        return new Serializer() {

            @Override
            public byte[] serialize(GenericRecord record) throws IOException {
                double lat = (double) record.get("lat");
                double lng = (double) record.get("lng");
                String data = lat + ":" + lng;
                return data.getBytes("UTF-8");
            }

            @Override
            public void close() throws IOException {
            }
        };
    }

    @Override
    public Deserializer deserializer(Properties properties) {
        throw new NotImplementedException("Not required for now.")
    }
}

To be able to compile the code this library dependency is required to be added to the build system:

dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-serde</artifactId>
    <version>1.0.2</version>
</dependency>

Once the code is built it can be deployed in Lenses. Follow the #deploy_custom_serde link to get the details.

CSV (Comma separated Values) 

CSV is a text storage format where the payload consists of a delimited sequence of values.

value1,value2,...,valueN 

Lenses will lift the data into a structure like below:

{
    "content":[
        value1,
        value2,
        ..
        valueN
    ]
}

All the items in the content field are text. If any of the values represent numbers the CAST function can be used to convert to the required type.

Windowed formats 

These are storage formats created by the Streaming mode. When performing windowed aggregations the resulting record key carries the window information with it. There are two storage formats for windowed aggregations:

  • Time Windowed
  • Session Windowed

They wrap the Key used during the aggregation. The Key storage format can be any of the other ones mentioned above, apart from custom ones.

For each windowed key the following schema applies:

{
  "value": <The Aggregation Key schema.It can be a a primitive or complext type>
  "window": {
    "start": <epoch time when the window started>,
    "end": <epoch time when the window ends. Only for session windows.>
  }
}