MQTT

This page describes the usage of the Stream Reactor MQTT Source Connector.

A Kafka Connect source connector to read events from MQTT and push them to Kafka.

Connector Class

io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector

Example

For more examples see the tutorials.

name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.client.id=dm_source_id
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.service.quality=1

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-mqtt-topic>
[WITHCONVERTER=`myclass`]

The selection of fields from the JMS message is not supported.

Examples:

-- Insert mode, select all fields from topicA
-- and write to topic topic with converter myclass
INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]

-- wildcard
INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]

Keyed JSON format

To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

Multiple key fields are supported using a delimiter:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.

Shared and Wildcard Subscriptions

The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.

-- wildcard
INSERT INTO kafkaTopic1
SELECT * FROM /mqttTopicA/+/sensors
WITHCONVERTER=`myclass`

Message converters

The connector supports converters to handle different messages payload format in the source topic. See source record converters.

Option Reference

Name
Description
Type
Default Value

connect.mqtt.hosts

Contains the MQTT connection end points.

string

connect.mqtt.username

Contains the Mqtt connection user name

string

connect.mqtt.password

Contains the Mqtt connection password

password

connect.mqtt.service.quality

Specifies the Mqtt quality of service

int

connect.mqtt.timeout

Provides the time interval to establish the mqtt connection

int

3000

connect.mqtt.clean

connect.mqtt.clean

boolean

true

connect.mqtt.keep.alive

The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.

int

5000

connect.mqtt.client.id

Contains the Mqtt session client id

string

connect.mqtt.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.mqtt.retry.interval

The time in milliseconds between retries.

int

60000

connect.mqtt.max.retries

The maximum number of times to try the write again.

int

20

connect.mqtt.retained.messages

Specifies the Mqtt retained flag.

boolean

false

connect.mqtt.converter.throw.on.error

If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

boolean

false

connect.converter.avro.schemas

If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter

string

connect.mqtt.log.message

Logs received MQTT messages

boolean

false

connect.mqtt.kcql

Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics

string

connect.mqtt.polling.timeout

Provides the timeout to poll incoming messages

int

1000

connect.mqtt.share.replicate

Replicate the shared subscriptions to all tasks instead of distributing them

boolean

false

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

connect.mqtt.ssl.ca.cert

Provides the path to the CA certificate file to use with the Mqtt connection

string

connect.mqtt.ssl.cert

Provides the path to the certificate file to use with the Mqtt connection

string

connect.mqtt.ssl.key

Certificate private [config] key file path.

string

connect.mqtt.process.duplicates

Process duplicate messages

boolean

false

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.