JMS Source

Download connector JMS Connector for Kafka 2.1.0

The JMS Source connector allows subscribe to messages on JMS queues and topics and write them to a Kafka topic. Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then that JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.

Prerequisites

  • Apache Kafka 0.11.x or above
  • Kafka Connect 0.11.x or above
  • A JMS framework (ActiveMQ for example)
  • Java 1.8

Features

  1. Pluggable converters of JMS payloads. If no converters are specified a AVRO message is created representing the JMS Message, the payload from the message is stored as a byte array in the payload field of the AVRO.
  2. Out of the box converters for JSON/AVRO and Binary
  3. The KCQL routing querying - JMS Destination to Kafka topic mapping
  4. Extra connection properties for specialized connections such as SOLACE_VPN.

KCQL Support

INSERT INTO kafka_topic SELECT * FROM jms_destination WITHTYPE { TOPIC|QUEUE } [WITHCONVERTER=`myclass`]

Tip

You can specify multiple KCQL statements separated by ; to have a the connector sink multiple topics.

The JMS source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:

  1. Selection of JMS topic or queue
  2. Selecting converter types for JMS message payloads.

Example:

-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE

-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE TOPIC WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.AvroConverter`

JMS Destination Type

JMS destination types can be topics or queues, the type can be set via the WITHTYPE keyword.

Converters

We provide four converters out of the box but you can plug your own. See an example here. which and be set in connect.jms.kcql statement. The WITHCONVERTER keyword supports this option.

AvroConverter

com.datamountaineer.streamreactor.connect.converters.source.AvroConverter

The payload is an AVRO message. In this case you need to provide a path for the AVRO schema file to be able to decode it.

JsonSimpleConverter

com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter

The payload is a JSON message. This converter will parse the JSON and create an AVRO record for it which will be sent over to Kafka.

JsonConverterWithSchemaEvolution

An experimental converter for translating JSON messages to AVRO. The resulting AVRO schema is fully compatible as new fields are added as the JSON payload evolves.

BytesConverter

com.datamountaineer.streamreactor.connect.converters.source.BytesConverter

This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an AVRO record with Schema.BYTES. You don’t have to provide a mapping for the source to get this converter!!

Lenses QuickStart

The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Source –> JMS and paste your configuration

../../_images/lenses-create-jms-source-connector.png

ActiveMQ Setup

For ActiveMQ follow http://activemq.apache.org/getting-started.html for the instruction of setting it up. You will also need a client jar, add this to the plugin.path of each connector worker.

Installing the Connector

Connect, in production should be run in distributed mode

  1. Install and configure a Kafka Connect cluster
  2. Create a folder on each server called plugins/lib
  3. Copy into the above folder the required connector jars from the stream reactor download
  4. Edit connect-avro-distributed.properties in the etc/schema-registry folder and uncomment the plugin.path option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2.
  5. Start Connect, bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

Connect Workers are long running processes so set an init.d or systemctl service accordingly.

Source Connector QuickStart

Start Kafka Connect in distributed mode (see install). In this mode a Rest Endpoint on port 8083 is exposed to accept connector configurations. We developed Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under the bin folder. Alternatively the Jar can be pulled from our GitHub releases page.

Starting the Connector (Distributed)

Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed the Stream Reactor.

Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for JMS. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.

export KAFKA_CONNECT_REST="http://myserver:myport"
➜  bin/connect-cli create jms-sink < conf/jms-source.properties

name=jms-source
connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO topic SELECT * FROM jms-queue
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://localhost:61616
connect.jms.connection.factory=ConnectionFactory

We can use the CLI to check if the connector is up but you should be able to see this in logs as well.

#check for running connectors with the CLI
➜ bin/connect-cli ps
jms-source

name=jms-source
connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO topic SELECT * FROM jms-queue
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://localhost:61616
connect.jms.connection.factory=ConnectionFactory
  INFO
    __                    __
   / /   ____ _____  ____/ /___  ____  ____
  / /   / __ `/ __ \/ __  / __ \/ __ \/ __ \
 / /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
                                   /_/
           ____  _____________
          / /  |/  / ___/ ___/____  __  _______________
         / / /|_/ /\__ \\__ \/ __ \/ / / / ___/ ___/ _ \  By Andrew Stevenson
    / /_/ / /  / /___/ /__/ / /_/ / /_/ / /  / /__/  __/
    \____/_/  /_//____/____/\____/\__,_/_/   \___/\___/
   (com.datamountaineer.streamreactor.connect.jms.source.JMSSourceTask:22)

Test Records

Now we need to send some records to the ActiveMQ broker for the Source Connector to pick up. We can do this with the ActiveMQ command line producer. In the bin folder of the Active MQ location run the following to insert 1000 messages into a queue called jms-queue.

activemq producer --destination queue://jms-queue --message "hello DataMountaineer"

We should immediately see the records coming through the sink and into our Kafka topic:

bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 \
--topic topic \
--from-beginning
{"message_timestamp":{"long":1490799748984},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:997"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748985},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:998"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748986},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:999"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748987},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:1000"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}

Configurations

The Kafka Connect framework requires the following in addition to any connectors specific configurations:

Config Description Type Value
name Name of the connector string  
tasks.max The number of tasks to scale output int 1
connector.class Name of the connector class string com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector

Connector Configurations

Config Description Type
connect.jms.url The JMS broker url string
connect.jms.username The user for the JMS connection string
connect.jms.password The password for the JMS connection string
connect.jms.initial.context.factory Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory string
connect.jms.connection.factory The ConnectionFactory implementation to use string
connect.jms.destination.selector Selector to use for destination lookup. Either CDI or JNDI (default) string
connect.jms.kcql
KCQL expression describing field selection and routes. The kcql expression also handles
setting the JMS destination type, i.e. TOPIC or QUEUE via the WITHTYPE keyword
and additionally the converter via the WITHCONVERTER keyword. If no converter is
specified the sink will default to the BytesConverter. This will send an avro message
over Kafka using Schema.BYTES
string

Optional configurations

Config Description Type
connect.jms.initial.context.extra.params
List (comma separated) of extra properties as key=value pairs
to supply to the initial context e.g. SOLACE_JMS_VPN=my_solace_vp
string
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read
and translate the raw bytes to an avro record.
The format is $JMS_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.batch.size The batch size to take from the JMS destination on each poll of Kafka Connect. Default is 100 int
connect.progress.enabled Enables the output for how many records have been processed boolean
connect.jms.evict.interval.minutes Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged. int
connect.jms.evict.threshold.minutes The number of minutes after which an uncommitted entry becomes evictable from the connector cache. int
connect.jms.scale.type Controls the scale type for the connector. If ‘kcql’ it is used it will scale based on the KCQL statements, otherwise it will respect the ‘tasks.max’ value. string

Provide your own Converter

You can always provide your own logic for converting the JMS message to your an AVRO record. If you have messages coming in Protobuf format you can deserialize the message based on the schema and create the AVRO record. All you have to do is create a new project and add our dependency:

Gradle:

compile "com.datamountaineer:kafka-connect-common:0.7.1"

Maven:

<dependency>
    <groupId>com.datamountaineer</groupId>
    <artifactId>kafka-connect-common</artifactId>
    <version>0.7.1</version>
</dependency>

Then all you have to do is implement com.datamountaineer.streamreactor.connect.converters.source.Converter.

Here is our BytesConverter class code:

class BytesConverter extends Converter {
  override def convert(kafkaTopic: String, sourceTopic: String, messageId: String, bytes: Array[Byte]): SourceRecord = {
    new SourceRecord(Collections.singletonMap(Converter.TopicKey, sourceTopic),
      null,
      kafkaTopic,
      MsgKey.schema,
      MsgKey.getStruct(sourceTopic, messageId),
      Schema.BYTES_SCHEMA,
      bytes)
  }
}

Schema Evolution

Not applicable.

Kubernetes

Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.

Add the Helm charts to your Helm instance:

helm repo add landoop https://lensesio.github.io/kafka-helm-charts/

TroubleShooting

Please review the FAQs and join our slack channel