LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Connector Class
  • Example
  • KCQL support
  • JMS Topics and Queues
  • JMS Payload
  • Kafka payload support
  • Error policies
  • Option Reference

Was this helpful?

Export as PDF
  1. Connectors
  2. Sinks

JMS

This page describes the usage of the Stream Reactor JMS Sink Connector.

PreviousInfluxDBNextMongoDB

Last updated 2 months ago

Was this helpful?

Connector Class

io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector

Example

For more examples see the .

name=jms
connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON

KCQL support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

INSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE

Examples:

-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE

-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC

JMS Topics and Queues

The sink can write to either topics or queues, specified by the WITHTYPE clause.

JMS Payload

When a message is sent to a JMS target it can be one of the following:

  • JSON - Send a TextMessage

  • AVRO - Send a BytesMessage

  • MAP - Send a MapMessage

  • OBJECT - Send an ObjectMessage

This is set by the WITHFORMAT keyword.

Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

Error policies

Option Reference

Name
Description
Type
Default Value

connect.jms.url

Provides the JMS broker url

string

connect.jms.initial.context.factory

Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

string

connect.jms.connection.factory

Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory

string

ConnectionFactory

connect.jms.kcql

connect.jms.kcql

string

connect.jms.subscription.name

subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics

string

connect.jms.password

Provides the password for the JMS connection

password

connect.jms.username

Provides the user for the JMS connection

string

connect.jms.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.jms.retry.interval

The time in milliseconds between retries.

int

60000

connect.jms.max.retries

The maximum number of times to try the write again.

int

20

connect.jms.destination.selector

Selector to use for destination lookup. Either CDI or JNDI.

string

CDI

connect.jms.initial.context.extra.params

List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp

list

[]

connect.jms.batch.size

The number of records to poll for on the target JMS destination in each Connect poll.

int

100

connect.jms.polling.timeout

Provides the timeout to poll incoming messages

long

1000

connect.jms.source.default.converter

Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter

string

connect.jms.converter.throw.on.error

If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

boolean

false

connect.converter.avro.schemas

If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE

string

connect.jms.headers

Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"

string

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

connect.jms.evict.interval.minutes

Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.

int

10

connect.jms.evict.threshold.minutes

The number of minutes after which an uncommitted entry becomes evictable from the connector cache.

int

10

connect.jms.scale.type

How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max

The connector supports .

tutorials
Error policies