4.3
Source record converters
We provide four converters out of the box but you can plug your own. See an example here. The WITHCONVERTER keyword supports this option.
AvroConverter
com.datamountaineer.streamreactor.connect.converters.source.AvroConverter
The payload is an AVRO message. In this case you need to provide a path for the AVRO schema file to be able to decode it.
JsonSimpleConverter
com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
The payload is a JSON message. This converter will parse the JSON and create an AVRO record for it which will be sent over to Kafka.
JsonConverterWithSchemaEvolution
An experimental converter for translating JSON messages to AVRO. The resulting AVRO schema is fully compatible as new fields are added as the JSON payload evolves.
BytesConverter
com.datamountaineer.streamreactor.connect.converters.source.BytesConverter
Provide your own Converter
You can always provide your own logic for converting the JMS message to your an AVRO record. If you have messages coming in Protobuf format you can deserialize the message based on the schema and create the AVRO record. All you have to do is create a new project and add our dependency:
Maven:
<dependency>
<groupId>com.datamountaineer</groupId>
<artifactId>kafka-connect-common</artifactId>
<version>2.0.0</version>
</dependency>
Then all you have to do is implement com.datamountaineer.streamreactor.connect.converters.source.Converter.
Here is our BytesConverter class code:
class BytesConverter extends Converter {
override def convert(kafkaTopic: String, sourceTopic: String, messageId: String, bytes: Array[Byte]): SourceRecord = {
new SourceRecord(Collections.singletonMap(Converter.TopicKey, sourceTopic),
null,
kafkaTopic,
MsgKey.schema,
MsgKey.getStruct(sourceTopic, messageId),
Schema.BYTES_SCHEMA,
bytes)
}
}
This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an AVRO record with Schema.BYTES. You don’t have to provide a mapping for the source to get this converter.