4.2

Deserializers

Lenses works out of the box with any messages in AVRO, JSON, XML and primitive formats.

When using messages with other formats like Google Protobuf or Thrift, you need to provide a deserialization envelop or else a serde. A serde handles reading raw bytes into a meaningful data structure and translating data into raw bytes based on the storage format.

Google Protobuf example 

For example a Kafka topic contains GPS coordinates, including a latitude and a longitude stored as double. Each “latitude:longitude” string, such as “45.623412:10.419433” is converted to UTF8 bytes, and published to a topic in Protobuf format.

A serde can enable to decode/encode this format. This can be achieved via a simple API. First use the following library dependency:

<dependency>
       <groupId>com.landoop</groupId>
       <artifactId>lenses-serde</artifactId>
       <version>1.0.3</version>
   </dependency>

And check the following Java code that implements a Serde interface that has two methods. The return type of the deserializer is a GenericRecord with the two fields parsed from the bytes.

In this simple example, we do not worry about error handling, such as ensuring that the tokens have valid double values.

public class LatLngSerde implements Serde {
  private Schema schema = SchemaBuilder.builder()
                          .record("lat_lng")
                          .fields()
                          .requiredDouble("lat")
                          .requiredDouble("lng")
                          .endRecord();

  @Override
  public Schema getSchema() {
    return schema;
  }

  @Override
  public Serializer serializer(Properties properties) {
    return new Serializer() {

      @Override
      public byte[] serialize(GenericRecord record) throws IOException {
        double lat = (double) record.get("lat");
        double lng = (double) record.get("lng");
        String data = lat + ":" + lng;
        return data.getBytes("UTF-8");
      }

      @Override
      public void close() throws IOException {
      }
    };
  }

  @Override
  public Deserializer deserializer(Properties properties) {
    return new Deserializer() {
      @Override
      public GenericRecord deserialize(byte[] bytes) throws IOException {
        String data = new String(bytes);
        String[] tokens = data.split(":");
        double lat = Double.parseDouble(tokens[0]);
        double lng = Double.parseDouble(tokens[1]);

        GenericRecord record = new GenericData.Record(schema);
        record.put("lat", lat);
        record.put("lng", lng);
        return record;
      }

      @Override
      public void close() throws IOException {
      }
    };
  }
}

Note that it is not necessary to implement the serializer method if you will not publish data in this format. In that case, simply return null.

Serde installation 

Build your serde code into a JAR file (without any dependencies included) and make it available by copying it into the %LENSES_HOME/serde folder (or mount it in that location for docker). Make sure that you only bring dependency libs that are not already included in $LENSES_HOME/lib.

Lenses will pick it up automatically. You can now choose the correct decoder for a Kafka by manually editing the schema of a Kafka topic for the key or value decoder and select the above deserializer.