JMS

This page describes the usage of the Stream Reactor JMS Source Connector.

A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.

The connector uses the standard JMS protocols and has been tested against ActiveMQ.

The connector allows for the JMS initial.context.factory and connection.factory to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path of the connector.

Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.

The schema of the messages is fixed and can be found under Data Types unless a converter is used.

You must provide the JMS implementation jars for your JMS service.

Connector Class

io.lenses.streamreactor.connect.jms.source.JMSSourceConnector

Example

For more examples see the tutorials.

name=jms-source
connector.class=io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://activemq:61616
connect.jms.connection.factory=ConnectionFactory

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

The following KCQL is supported:

INSERT INTO kafka_topic
SELECT *
FROM jms_destination
WITHTYPE [TOPIC|QUEUE]
[WITHCONVERTER=`myclass`]

The selection of fields from the JMS message is not supported.

Examples:

-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE

-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA
SELECT * FROM jms_topic
WITHTYPE TOPIC
WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.AvroConverter`

Destination types

The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.

Message Converters

The connector supports converters to handle different message payload formats in the source topic or queue.

If no converter is provided the JMS message is converter to a Kafka Struct representation.

See source record converters.

Data type conversion

Option Reference

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.