Google Protobuf and custom formats

By default, Lenses can interpret messages written in AVRO, JSON, and the common primitive formats such as Strings, longs and so on. However, sometimes you may have topics which are processing messages written in another format, such as Google Protobuf, Thrift, or even your own proprietary format. In order for Lenses to be able to handle these messages, it is possible to supply a serde.

A serde is responsible for two things. First, it handles reading and translating the raw records bytes into a meaningful data structure. Second, it translates the data into raw bytes based on the storage format standard. For example, a custom deserializer may be passed a byte array from a record which holds Google Protobuf data. Only this custom serde will know how to interpret that byte array and inflate it as an object using the Protobuf marshaller. Then it will use that object to translate it to an AVRO Record which will be returned back to Lenses.

Let us consider a simple example of a topic that contains GPS coordinates. This consists of a latitude and a longitude, both of which are doubles. We encode this as a string “latitude:longitude”, such as “45.623412:10.419433”. From this string, we take the UTF8 bytes, and that is what is published onto the topic. It would be impossible for a system to interpret this without having the code required to decode/encode this format. Therefore we must provide a custom serde which can convert bytes back into the latitude and longitude parts. Similarly, to allow Lenses to publish data we must be able to take a record that contains a latitude and longitude and return the correctly encoded byte array.

Lenses provides a simple API for hooking up custom storage into it SQL engine.

The following Java code implements the interface Serde which has two methods that must be implemented. Notice that the return type of the deserializer is an GenericRecord and that is populated with the two fields parsed from the bytes. In this simple example, we do not worry about error handling, such as ensuring that the tokens have valid double values.

First, make sure you bring the library dependency to your project. Here is the maven example:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-serde</artifactId>
    <version>1.0.2</version>
</dependency>
public class LatLngSerde implements Serde {
    private Schema schema = SchemaBuilder.builder()
                        .record("lat_lng")
                        .fields()
                        .requiredDouble("lat")
                        .requiredDouble("lng")
                        .endRecord();

    @Override
    public Schema getSchema() {
        return schema;
    }

    @Override
    public Serializer serializer(Properties properties) {
        return new Serializer() {

            @Override
            public byte[] serialize(GenericRecord record) throws IOException {
                double lat = (double) record.get("lat");
                double lng = (double) record.get("lng");
                String data = lat + ":" + lng;
                return data.getBytes("UTF-8");
            }

            @Override
            public void close() throws IOException {
            }
        };
    }

    @Override
    public Deserializer deserializer(Properties properties) {
        return new Deserializer() {
            @Override
            public GenericRecord deserialize(byte[] bytes) throws IOException {
                String data = new String(bytes);
                String[] tokens = data.split(":");
                double lat = Double.parseDouble(tokens[0]);
                double lng = Double.parseDouble(tokens[1]);

                GenericRecord record = new GenericData.Record(schema);
                record.put("lat", lat);
                record.put("lng", lng);
                return record;
            }

            @Override
            public void close() throws IOException {
            }
        };
    }
}

Note: It is not necessary to implement the serializer method if you will not publish data in this format. In that case, simply return null.

Installation of Custom Serdes

Once a serde has been written, it must be deployed to the Lenses instance. To do this, package up the serde as a jar, with any dependencies it requires, and install the jar into %LENSES_HOME/serdes. Any jars deployed here are picked up after a ten-second delay at which point Lenses will scan the jar to discover any serde implementations. Then, Lenses will make those serde implementations available as a decoder type in the UI.

The following screenshot shows such a serde that has been detected and is available for selection.

Important

Only a user with TableStorageWrite permission can set up the table/topic storage format information in Lenses. Follow the

../../_images/serde_selection.png

The following screenshot shows a topic which contains credit-card messages written in a Google Protobuf format. As the serde is not being used, Lenses interprets the data as raw bytes.

../../_images/raw_protobuf_data.png

In the final screenshot we can see that once the serde is selected, the data is correctly marshaled as the credit-card data.

../../_images/converted_protobuf_data.png