MQTT
This page describes the usage of the Stream Reactor MQTT Source Connector.
A Kafka Connect source connector to read events from MQTT and push them to Kafka.
Connector Class
Example
For more examples see the tutorials.
KCQL support
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
Keyed JSON format
To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
Shared and Wildcard Subscriptions
The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.
Message converters
The connector supports converters to handle different messages payload format in the source topic. See source record converters.
Option Reference
connect.mqtt.hosts
Contains the MQTT connection end points.
string
connect.mqtt.username
Contains the Mqtt connection user name
string
connect.mqtt.password
Contains the Mqtt connection password
password
connect.mqtt.service.quality
Specifies the Mqtt quality of service
int
connect.mqtt.timeout
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.log.message
Logs received MQTT messages
boolean
false
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.mqtt.polling.timeout
Provides the timeout to poll incoming messages
int
1000
connect.mqtt.share.replicate
Replicate the shared subscriptions to all tasks instead of distributing them
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
connect.mqtt.process.duplicates
Process duplicate messages
boolean
false
Last updated