# JMS

A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.

The connector uses the standard JMS protocols and has been tested against ActiveMQ.

The connector allows for the JMS **`initial.context.factory`** and **`connection.factory`** to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the **`plugin.path`** of the connector.

Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.

The schema of the messages is fixed and can be found under Data Types unless a converter is used.

{% hint style="warning" %}
You must provide the JMS implementation jars for your JMS service.
{% endhint %}

## Connector Class

```
io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="true" %}

```bash
name=jms-source
connector.class=io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://activemq:61616
connect.jms.connection.factory=ConnectionFactory
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by `;` to have the connector sink into multiple topics.

However, you can not route the same source to different topics, for this use a separate connector instance.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO kafka_topic
SELECT *
FROM jms_destination
WITHTYPE [TOPIC|QUEUE]
[WITHCONVERTER=`myclass`]
```

The selection of fields from the JMS message is **not** supported.

Examples:

{% code fullWidth="true" %}

```sql
-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE

-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA
SELECT * FROM jms_topic
WITHTYPE TOPIC
WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.AvroConverter`
```

{% endcode %}

## Destination types <a href="#destination-types" id="destination-types"></a>

The connector supports both TOPICS and QUEUES, controlled by the **WITHTYPE** KCQL clause.

## Message Converters <a href="#message-converters" id="message-converters"></a>

The connector supports converters to handle different message payload formats in the source topic or queue.

If no converter is provided the JMS message is converter to a Kafka Struct representation.

See [source record converters](https://docs.lenses.io/latest/connectors/tutorials/source-converters-with-incoming-json-or-avro).

### Data type conversion <a href="#jms-schema" id="jms-schema"></a>

| Name               | Type             |
| ------------------ | ---------------- |
| message\_timestamp | Optional int64   |
| correlation\_id    | Optional string  |
| redelivered        | Optional boolean |
| reply\_to          | Optional string  |
| destination        | Optional string  |
| message\_id        | Optional string  |
| mode               | Optional int32   |
| type               | Optional string  |
| priority           | Optional int32   |
| bytes\_payload     | Optional bytes   |
| properties         | Map of string    |

## Option Reference <a href="#options" id="options"></a>

<table data-full-width="true"><thead><tr><th width="367">Name</th><th width="392">Description</th><th width="107.5">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.jms.url</td><td>Provides the JMS broker url</td><td>string</td><td></td></tr><tr><td>connect.jms.initial.context.factory</td><td>Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory</td><td>string</td><td></td></tr><tr><td>connect.jms.connection.factory</td><td>Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory</td><td>string</td><td>ConnectionFactory</td></tr><tr><td>connect.jms.kcql</td><td>connect.jms.kcql</td><td>string</td><td></td></tr><tr><td>connect.jms.subscription.name</td><td>subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics</td><td>string</td><td></td></tr><tr><td>connect.jms.password</td><td>Provides the password for the JMS connection</td><td>password</td><td></td></tr><tr><td>connect.jms.username</td><td>Provides the user for the JMS connection</td><td>string</td><td></td></tr><tr><td>connect.jms.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically</td><td>string</td><td>THROW</td></tr><tr><td>connect.jms.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.jms.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.jms.destination.selector</td><td>Selector to use for destination lookup. Either CDI or JNDI.</td><td>string</td><td>CDI</td></tr><tr><td>connect.jms.initial.context.extra.params</td><td>List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp</td><td>list</td><td>[]</td></tr><tr><td>connect.jms.batch.size</td><td>The number of records to poll for on the target JMS destination in each Connect poll.</td><td>int</td><td>100</td></tr><tr><td>connect.jms.polling.timeout</td><td>Provides the timeout to poll incoming messages</td><td>long</td><td>1000</td></tr><tr><td>connect.jms.source.default.converter</td><td>Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter</td><td>string</td><td></td></tr><tr><td>connect.jms.converter.throw.on.error</td><td>If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.</td><td>boolean</td><td>false</td></tr><tr><td>connect.converter.avro.schemas</td><td>If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE</td><td>string</td><td></td></tr><tr><td>connect.jms.headers</td><td>Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"</td><td>string</td><td></td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed</td><td>boolean</td><td>false</td></tr><tr><td>connect.jms.evict.interval.minutes</td><td>Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.</td><td>int</td><td>10</td></tr><tr><td>connect.jms.evict.threshold.minutes</td><td>The number of minutes after which an uncommitted entry becomes evictable from the connector cache.</td><td>int</td><td>10</td></tr><tr><td>connect.jms.scale.type</td><td>How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max</td><td></td><td></td></tr></tbody></table>
