All pages
Powered by GitBook
1 of 1

Loading...

MQTT

This page describes the usage of the Stream Reactor MQTT Sink Connector.

Connector Class

Example

For more examples see the .

KCQL Support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

Examples:

Dynamic targets

The connector can route the messages to specific MQTT targets. These are the possible options:

  1. Given Constant Topic:

    Route messages to a specified MQTT topic using a constant value:

  2. Using a Field Value for Topic:

    Direct messages to an MQTT topic based on a specified field's value. Example expression: fieldA.fieldB.fieldC.

  3. Utilizing Message Key as Topic:

Kafka payload support

The sink publishes each Kafka record to MQTT according to the type of the record’s value:

Payload type
Action taken by the sink

In short, non-binary objects are first turned into a JSON string; everything that reaches MQTT is ultimately a sequence of bytes.

Error policies

The connector supports .

Option Reference

Name
Description
Type
Default Value

Migration

Version 9 introduces two breaking changes that affect how you route data and project fields in KCQL. Review the sections below and update your configurations before restarting the connector.

WITHTARGET is no longer used

Migration step

  1. Delete every WITHTARGET … clause.

  2. Move the placeholder (or literal) that held the target path into the INSERT INTO expression.

  3. Add mqtt.target.from.field=true to the KCQL PROPERTIES list.

KCQL field projections are ignored

Migration step

  • Remove field lists and aliases from KCQL.

  • Attach an SMT such as org.apache.kafka.connect.transforms.ExtractField$Value, org.apache.kafka.connect.transforms.MaskField$Value, or your own custom SMT to perform the same logic.

io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
Determine the MQTT topic by the incoming message key, expected to be a string.
  • Leveraging Kafka Message Topic:

    Set the MQTT topic using the incoming Kafka message’s original topic.

  • connect.mqtt.service.quality

    Specifies the Mqtt quality of service

    int

    connect.mqtt.timeout

    Provides the time interval to establish the mqtt connection

    int

    3000

    connect.mqtt.clean

    connect.mqtt.clean

    boolean

    true

    connect.mqtt.keep.alive

    The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.

    int

    5000

    connect.mqtt.client.id

    Contains the Mqtt session client id

    string

    connect.mqtt.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.mqtt.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.mqtt.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.mqtt.retained.messages

    Specifies the Mqtt retained flag.

    boolean

    false

    connect.mqtt.converter.throw.on.error

    If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

    boolean

    false

    connect.converter.avro.schemas

    If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter

    string

    connect.mqtt.kcql

    Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics

    string

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.mqtt.ssl.ca.cert

    Provides the path to the CA certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.cert

    Provides the path to the certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.key

    Certificate private [config] key file path.

    string

    Binary (byte[])

    Forwarded unchanged (“pass-through”).

    String

    Published as its UTF-8 byte sequence.

    Connect Struct produced by Avro, Protobuf, or JSON Schema converters

    Converted to JSON, then published as the JSON string’s UTF-8 bytes.

    java.util.Map or other Java Collection

    Serialized to JSON and published as the JSON string’s UTF-8 bytes.

    connect.mqtt.hosts

    Contains the MQTT connection end points.

    string

    connect.mqtt.username

    Contains the Mqtt connection user name

    string

    connect.mqtt.password

    Contains the Mqtt connection password

    password

    Before (v < 9.0.0)

    After (v ≥ 9.0.0)

    INSERT INTO SELECT * FROM control.boxes.test WITHTARGET ${path}

    INSERT INTO ${path} SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')

    Before (v < 9.0.0)

    After (v ≥ 9.0.0)

    Handled directly in the KCQL SELECT list: SELECT id, temp AS temperature …

    The connector passes the full record. Any projection, renaming, or value transformation must be done with a Kafka Connect Single Message Transformer (SMT), KStreams, or another preprocessing step.

    tutorials
    Error policies

    INSERT INTO `_topic` 
    SELECT ...
    name=mqtt
    connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
    tasks.max=1
    topics=orders
    connect.mqtt.hosts=tcp://mqtt:1883
    connect.mqtt.clean=true
    connect.mqtt.timeout=1000
    connect.mqtt.keep.alive=1000
    connect.mqtt.service.quality=1
    connect.mqtt.client.id=dm_sink_id
    connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders
    INSERT
    INTO <mqtt-topic>
    SELECT * //no field projection supported
    FROM <kafka-topic>
    //no WHERE clause supported
    -- Insert into /landoop/demo all fields from kafka_topicA
    INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA
    
    -- Insert into /landoop/demo all fields from dynamic field
    INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')
    INSERT INTO lenses-io-demo ...
    INSERT INTO `<path to field>` 
    SELECT * FROM control.boxes.test 
    PROPERTIES('mqtt.target.from.field'='true')
    INSERT INTO `_key` 
    SELECT ...