# JMS

## Connector Class

```
io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="true" %}

```bash
name=jms
connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by **`;`** to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE
```

Examples:

```sql
-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE

-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC
```

## JMS Topics and Queues <a href="#jms-topics-and-queues" id="jms-topics-and-queues"></a>

The sink can write to either topics or queues, specified by the **WITHTYPE** clause.

## JMS Payload <a href="#jms-payload" id="jms-payload"></a>

When a message is sent to a JMS target it can be one of the following:

* JSON - Send a TextMessage
* AVRO - Send a BytesMessage
* MAP - Send a MapMessage
* OBJECT - Send an ObjectMessage

This is set by the **WITHFORMAT** keyword.

## Kafka payload support <a href="#kafka-payload-support" id="kafka-payload-support"></a>

This sink supports the following Kafka payloads:

* Schema.Struct and Struct (Avro)
* Schema.Struct and JSON
* No Schema and JSON

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](https://docs.lenses.io/latest/connectors/tutorials/using-error-policies).

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

<table data-full-width="true"><thead><tr><th width="342">Name</th><th width="446">Description</th><th width="96.5">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.jms.url</td><td>Provides the JMS broker url</td><td>string</td><td></td></tr><tr><td>connect.jms.initial.context.factory</td><td>Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory</td><td>string</td><td></td></tr><tr><td>connect.jms.connection.factory</td><td>Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory</td><td>string</td><td>ConnectionFactory</td></tr><tr><td>connect.jms.kcql</td><td>connect.jms.kcql</td><td>string</td><td></td></tr><tr><td>connect.jms.subscription.name</td><td>subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics</td><td>string</td><td></td></tr><tr><td>connect.jms.password</td><td>Provides the password for the JMS connection</td><td>password</td><td></td></tr><tr><td>connect.jms.username</td><td>Provides the user for the JMS connection</td><td>string</td><td></td></tr><tr><td>connect.jms.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically</td><td>string</td><td>THROW</td></tr><tr><td>connect.jms.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.jms.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.jms.destination.selector</td><td>Selector to use for destination lookup. Either CDI or JNDI.</td><td>string</td><td>CDI</td></tr><tr><td>connect.jms.initial.context.extra.params</td><td>List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp</td><td>list</td><td>[]</td></tr><tr><td>connect.jms.batch.size</td><td>The number of records to poll for on the target JMS destination in each Connect poll.</td><td>int</td><td>100</td></tr><tr><td>connect.jms.polling.timeout</td><td>Provides the timeout to poll incoming messages</td><td>long</td><td>1000</td></tr><tr><td>connect.jms.source.default.converter</td><td>Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter</td><td>string</td><td></td></tr><tr><td>connect.jms.converter.throw.on.error</td><td>If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.</td><td>boolean</td><td>false</td></tr><tr><td>connect.converter.avro.schemas</td><td>If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE</td><td>string</td><td></td></tr><tr><td>connect.jms.headers</td><td>Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"</td><td>string</td><td></td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed</td><td>boolean</td><td>false</td></tr><tr><td>connect.jms.evict.interval.minutes</td><td>Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.</td><td>int</td><td>10</td></tr><tr><td>connect.jms.evict.threshold.minutes</td><td>The number of minutes after which an uncommitted entry becomes evictable from the connector cache.</td><td>int</td><td>10</td></tr><tr><td>connect.jms.scale.type</td><td>How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max</td><td></td><td></td></tr></tbody></table>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/jms.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
