MQTT


A Kafka Connect source connector to read events from MQTT and push them to Kafka.

KCQL support 

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-mqtt-topic>
[WITHCONVERTER=`myclass`]

Selection of fields from the JMS message is not supported.

Examples:

-- Insert mode, select all fields from topicA
-- and write to topic topic with converter myclass
INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]

-- wildcard
INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]

Concepts 

Keyed JSON format 

In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

Multiple keys fields are supported using a delimiter:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.

Shared and Wildcard Subscriptions 

The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.

-- wildcard
INSERT INTO kafkaTopic1
SELECT * FROM /mqttTopicA/+/sensors
WITHCONVERTER=`myclass`

Message converters 

The connector supports converters to handle different messages payload format in the source topic. See source record converters.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=mqtt
docker-compose up -d mqtt

Inserting test data 

Login into the mqtt container:

docker exec \
    -ti mqtt \
    mosquitto_pub \
    -m "{\"deviceId\":1,\"value\":31.1,\"region\":\"EMEA\",\"timestamp\":1482236627236}" \
    -d -r -t /mjson

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select MQTT as the source and paste the following:

name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.client.id=dm_source_id
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.service.quality=1

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/sh

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create mqtt < connector.properties

connect-cli create mqtt < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status mqtt

Check for records in Kafka 

Check the records in Lenses or with via the console:

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic mqtt \
    --from-beginning

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.mqtt.hostsContains the MQTT connection end points.string
connect.mqtt.usernameContains the Mqtt connection user namestring
connect.mqtt.passwordContains the Mqtt connection passwordpassword
connect.mqtt.service.qualitySpecifies the Mqtt quality of serviceint
connect.mqtt.timeoutProvides the time interval to establish the mqtt connectionint3000
connect.mqtt.cleanconnect.mqtt.cleanbooleantrue
connect.mqtt.keep.aliveThe keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.int5000
connect.mqtt.client.idContains the Mqtt session client idstring
connect.mqtt.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.mqtt.retry.intervalThe time in milliseconds between retries.int60000
connect.mqtt.max.retriesThe maximum number of times to try the write again.int20
connect.mqtt.retained.messagesSpecifies the Mqtt retained flag.booleanfalse
connect.mqtt.converter.throw.on.errorIf set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.booleanfalse
connect.converter.avro.schemasIf the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converterstring
connect.mqtt.log.messageLogs received MQTT messagesbooleanfalse
connect.mqtt.kcqlContains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topicsstring
connect.mqtt.polling.timeoutProvides the timeout to poll incoming messagesint1000
connect.mqtt.share.replicateReplicate the shared subscriptions to all tasks instead of distributing thembooleanfalse
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.mqtt.ssl.ca.certProvides the path to the CA certificate file to use with the Mqtt connectionstring
connect.mqtt.ssl.certProvides the path to the certificate file to use with the Mqtt connectionstring
connect.mqtt.ssl.keyCertificate private [config] key file path.string
connect.mqtt.process.duplicatesProcess duplicate messagesbooleanfalse
--
Last modified: September 26, 2024