JMS


Kafka Connect sink connector for writing data from Kafka to JMS.

JMS driver for your JMS distribution - The connector does not package an client jars

KCQL support 

The following KCQL is supported:

INSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE

Examples:

-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE

-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC

Concepts 

JMS Topics and Queues 

The sink can write to either topics or queues, specified by the WITHTYPE clause.

JMS Payload 

When a message is sent to a JMS target it can be one of the following:

  • JSON - Send a TextMessage
  • AVRO - Send a BytesMessage
  • MAP - Send a MapMessage
  • OBJECT - Send an ObjectMessage

This is set by the WITHFORMAT keyword.

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=jms
docker-compose up -d activemq

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select JMS as the sink and paste the following:

name=jms
connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create jms-sink < connector.properties

connect-cli create jms-sink < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status jms

Inserting test data 

In the to lenses-box container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{
  "id": 1,
  "created": "2016-05-06 13:53:00",
  "product": "OP-DAX-P-20150201-95.7",
  "price": 94.2,
  "qty": 100
}

Check for data in JMS 

docker exec -ti activemq bin/activemq consumer --destination queue://orders

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.jms.urlProvides the JMS broker urlstring
connect.jms.initial.context.factoryInitial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactorystring
connect.jms.connection.factoryProvides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactorystringConnectionFactory
connect.jms.kcqlconnect.jms.kcqlstring
connect.jms.subscription.namesubscription name to use when subscribing to a topic, specifying this makes a durable subscription for topicsstring
connect.jms.passwordProvides the password for the JMS connectionpassword
connect.jms.usernameProvides the user for the JMS connectionstring
connect.jms.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.jms.retry.intervalThe time in milliseconds between retries.int60000
connect.jms.max.retriesThe maximum number of times to try the write again.int20
connect.jms.destination.selectorSelector to use for destination lookup. Either CDI or JNDI.stringCDI
connect.jms.initial.context.extra.paramsList (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vplist[]
connect.jms.batch.sizeThe number of records to poll for on the target JMS destination in each Connect poll.int100
connect.jms.polling.timeoutProvides the timeout to poll incoming messageslong1000
connect.jms.source.default.converterContains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.source.converters.AvroConverterstring
connect.jms.converter.throw.on.errorIf set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.booleanfalse
connect.converter.avro.schemasIf the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILEstring
connect.jms.headersContains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"string
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.jms.evict.interval.minutesRemoves the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.int10
connect.jms.evict.threshold.minutesThe number of minutes after which an uncommitted entry becomes evictable from the connector cache.int10
connect.jms.scale.typeHow the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.maxstringkcql
--
Last modified: September 26, 2024