This page describes the usage of the Stream Reactor JMS Sink Connector.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
The sink can write to either topics or queues, specified by the WITHTYPE clause.
When a message is sent to a JMS target it can be one of the following:
JSON - Send a TextMessage
AVRO - Send a BytesMessage
MAP - Send a MapMessage
OBJECT - Send an ObjectMessage
This is set by the WITHFORMAT keyword.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
The connector supports Error policies.
Name | Description | Type | Default Value |
---|---|---|---|
connect.jms.url
Provides the JMS broker url
string
connect.jms.initial.context.factory
Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
string
connect.jms.connection.factory
Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory
string
ConnectionFactory
connect.jms.kcql
connect.jms.kcql
string
connect.jms.subscription.name
subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics
string
connect.jms.password
Provides the password for the JMS connection
password
connect.jms.username
Provides the user for the JMS connection
string
connect.jms.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.jms.retry.interval
The time in milliseconds between retries.
int
60000
connect.jms.max.retries
The maximum number of times to try the write again.
int
20
connect.jms.destination.selector
Selector to use for destination lookup. Either CDI or JNDI.
string
CDI
connect.jms.initial.context.extra.params
List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp
list
[]
connect.jms.batch.size
The number of records to poll for on the target JMS destination in each Connect poll.
int
100
connect.jms.polling.timeout
Provides the timeout to poll incoming messages
long
1000
connect.jms.source.default.converter
Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter
string
connect.jms.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.headers
Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.jms.evict.interval.minutes
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.
int
10
connect.jms.evict.threshold.minutes
The number of minutes after which an uncommitted entry becomes evictable from the connector cache.
int
10
connect.jms.scale.type
How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max