Source record converters


We provide four converters out of the box but you can plug in your own. See an example here. The WITHCONVERTER keyword supports this option.

AvroConverter 

com.datamountaineer.streamreactor.connect.converters.source.AvroConverter

The payload is an Avro message. In this case you need to provide a path for the Avro schema file to be able to decode it.

JsonSimpleConverter 

com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter

The payload is a JSON message. This converter will parse the JSON and create an Avro record for it which will be sent over to Kafka.

JsonConverterWithSchemaEvolution 

An experimental converter for translating JSON messages to Avro. The resulting Avro schema is fully compatible as new fields are added as the JSON payload evolves.

BytesConverter 

com.datamountaineer.streamreactor.connect.converters.source.BytesConverter

Provide your own Converter 

You can always provide your own logic for converting the JMS message to your an Avro record. If you have messages coming in Protobuf format you can deserialize the message based on the schema and create the Avro record. All you have to do is create a new project and add our dependency:


Maven:

<dependency>
    <groupId>com.datamountaineer</groupId>
    <artifactId>kafka-connect-common</artifactId>
    <version>2.0.0</version>
</dependency>

Then all you have to do is implement com.datamountaineer.streamreactor.connect.converters.source.Converter.

Here is our BytesConverter class code:


class BytesConverter extends Converter {
  override def convert(kafkaTopic: String, sourceTopic: String, messageId: String, bytes: Array[Byte]): SourceRecord = {
    new SourceRecord(Collections.singletonMap(Converter.TopicKey, sourceTopic),
      null,
      kafkaTopic,
      MsgKey.schema,
      MsgKey.getStruct(sourceTopic, messageId),
      Schema.BYTES_SCHEMA,
      bytes)
  }
}

This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an Avro record with Schema.BYTES. You don’t have to provide a mapping for the source to get this converter.

--
Last modified: September 26, 2024