MQTT

This page describes the usage of the Stream Reactor MQTT Source Connector.

A Kafka Connect source connector to read events from MQTT and push them to Kafka.

Connector Class

io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector

Example

For more examples see the tutorials.

name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.client.id=dm_source_id
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.service.quality=1

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-mqtt-topic>
[WITHCONVERTER=`myclass`]

The selection of fields from the JMS message is not supported.

Examples:

-- Insert mode, select all fields from topicA
-- and write to topic topic with converter myclass
INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]

-- wildcard
INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]

Keyed JSON format

To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

Multiple key fields are supported using a delimiter:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.

Shared and Wildcard Subscriptions

The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.

-- wildcard
INSERT INTO kafkaTopic1
SELECT * FROM /mqttTopicA/+/sensors
WITHCONVERTER=`myclass`

Message converters

The connector supports converters to handle different messages payload format in the source topic. See source record converters.

Option Reference

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.