JMS

A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.

KCQL support 

The following KCQL is supported:

INSERT INTO kafka_topic 
SELECT * 
FROM jms_destination 
WITHTYPE [TOPIC|QUEUE]
[WITHCONVERTER=`myclass`]

Selection of fields from the JMS message is not supported.

Examples:

-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE

-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA 
SELECT * FROM jms_queue 
WITHTYPE TOPIC 
WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.AvroConverter`

Concepts 

The connector uses the standard JMS proctocols and has been tested against ActiveMQ.

The connector allows for the JMS inital.context.factory and connection.factory to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path of the connector.

Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then that JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.

The schema of the messages is fixed and can found Data Types unless a converter is used.

Destination Types 

The connector support both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.

Message Converters 

The connector supports converters to handle different messages payload format in the source topic or queue. See source record converters.

If no converter is provide the JMS message is converter to a Kafka Struct representation. See Data Types.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=jms
docker-compose up -d activemq

Inserting test data 

Login into the activemq container:

docker exec -ti activemq /bin/bash 

Run the following command to generate messages:

bin/activemq producer --destination queue://jms-queue --message "hello Lenses!"

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select JMS as the source and paste the following:

name=jms-source
connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://activemq:61616
connect.jms.connection.factory=ConnectionFactory

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create jms < connector.properties

Wait a for the connector to start and check its running:

connect-cli status jms

Check for records in Kafka 

Check the records in Lenses or with via the console:

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic jms \
    --from-beginning 

Data type conversion 

NameType
message_timestampOptional int64
correlation_idOptional string
redeliveredOptional boolean
reply_toOptional string
destinationOptional string
message_idOptional string
modeOptional int32
typeOptional string
priorityOptional int32
bytes_payloadOptional bytes
propertiesMap of string

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.jms.urlProvides the JMS broker urlstring
connect.jms.initial.context.factoryInitial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactorystring
connect.jms.connection.factoryProvides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactorystringConnectionFactory
connect.jms.kcqlconnect.jms.kcqlstring
connect.jms.subscription.namesubscription name to use when subscribing to a topic, specifying this makes a durable subscription for topicsstring
connect.jms.passwordProvides the password for the JMS connectionpassword
connect.jms.usernameProvides the user for the JMS connectionstring
connect.jms.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.jms.retry.intervalThe time in milliseconds between retries.int60000
connect.jms.max.retriesThe maximum number of times to try the write again.int20
connect.jms.destination.selectorSelector to use for destination lookup. Either CDI or JNDI.stringCDI
connect.jms.initial.context.extra.paramsList (comma separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vplist[]
connect.jms.batch.sizeThe number of records to poll for on the target JMS destination in each Connect poll.int100
connect.jms.polling.timeoutProvides the timeout to poll incoming messageslong1000
connect.jms.source.default.converterContains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverterstring
connect.jms.converter.throw.on.errorIf set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.booleanfalse
connect.converter.avro.schemasIf the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILEstring
connect.jms.headersContains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage”string
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.jms.evict.interval.minutesRemoves the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.int10
connect.jms.evict.threshold.minutesThe number of minutes after which an uncommitted entry becomes evictable from the connector cache.int10
connect.jms.scale.typeHow the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.maxstringkcql