# MQTT

## Connector Class

```
io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

```bash
name=mqtt
connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=orders
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_sink_id
connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders
```

## KCQL Support

{% hint style="success" %}
You can specify multiple KCQL statements separated by **`;`** to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT
INTO <mqtt-topic>
SELECT * //no field projection supported
FROM <kafka-topic>
//no WHERE clause supported
```

Examples:

<pre class="language-sql"><code class="lang-sql">-- Insert into /landoop/demo all fields from kafka_topicA
INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA

-- Insert into /landoop/demo all fields from dynamic field
<strong>INSERT INTO `&#x3C;field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')
</strong></code></pre>

## Dynamic targets <a href="#dynamic-targets" id="dynamic-targets"></a>

The connector can route the messages to specific MQTT targets. These are the possible options:

1. **Given Constant Topic:**

   Route messages to a specified MQTT topic using a constant value:

   ```sql
   INSERT INTO lenses-io-demo ...
   ```
2. **Using a Field Value for Topic:**

   Direct messages to an MQTT topic based on a specified field's value. Example expression: `fieldA.fieldB.fieldC`.

   ```sql
   INSERT INTO `<path to field>` 
   SELECT * FROM control.boxes.test 
   PROPERTIES('mqtt.target.from.field'='true')
   ```
3. **Utilizing Message Key as Topic:**

   Determine the MQTT topic by the incoming message key, expected to be a string.

   ```sql
   INSERT INTO `_key` 
   SELECT ...
   ```
4. **Leveraging Kafka Message Topic:**

   Set the MQTT topic using the incoming Kafka message’s original topic.

   ```sql
   INSERT INTO `_topic` 
   SELECT ...
   ```

## Kafka payload support <a href="#kafka-payload-support" id="kafka-payload-support"></a>

The sink publishes each Kafka record to MQTT according to the **type of the record’s value**:

| Payload type                                                               | Action taken by the sink                                            |
| -------------------------------------------------------------------------- | ------------------------------------------------------------------- |
| **Binary** (`byte[]`)                                                      | Forwarded unchanged (“pass-through”).                               |
| **String**                                                                 | Published as its UTF-8 byte sequence.                               |
| **Connect `Struct`** produced by Avro, Protobuf, or JSON Schema converters | Converted to JSON, then published as the JSON string’s UTF-8 bytes. |
| **`java.util.Map` or other Java Collection**                               | Serialized to JSON and published as the JSON string’s UTF-8 bytes.  |

In short, non-binary objects are first turned into a JSON string; everything that reaches MQTT is ultimately a sequence of bytes.

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](https://docs.lenses.io/latest/connectors/tutorials/using-error-policies).

## Option Reference

<table data-full-width="true"><thead><tr><th>Name</th><th width="465">Description</th><th width="119">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.mqtt.hosts</td><td>Contains the MQTT connection end points.</td><td>string</td><td></td></tr><tr><td>connect.mqtt.username</td><td>Contains the Mqtt connection user name</td><td>string</td><td></td></tr><tr><td>connect.mqtt.password</td><td>Contains the Mqtt connection password</td><td>password</td><td></td></tr><tr><td>connect.mqtt.service.quality</td><td>Specifies the Mqtt quality of service</td><td>int</td><td></td></tr><tr><td>connect.mqtt.timeout</td><td>Provides the time interval to establish the mqtt connection</td><td>int</td><td>3000</td></tr><tr><td>connect.mqtt.clean</td><td>connect.mqtt.clean</td><td>boolean</td><td>true</td></tr><tr><td>connect.mqtt.keep.alive</td><td>The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.</td><td>int</td><td>5000</td></tr><tr><td>connect.mqtt.client.id</td><td>Contains the Mqtt session client id</td><td>string</td><td></td></tr><tr><td>connect.mqtt.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically</td><td>string</td><td>THROW</td></tr><tr><td>connect.mqtt.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.mqtt.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.mqtt.retained.messages</td><td>Specifies the Mqtt retained flag.</td><td>boolean</td><td>false</td></tr><tr><td>connect.mqtt.converter.throw.on.error</td><td>If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.</td><td>boolean</td><td>false</td></tr><tr><td>connect.converter.avro.schemas</td><td>If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter</td><td>string</td><td></td></tr><tr><td>connect.mqtt.kcql</td><td>Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics</td><td>string</td><td></td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed</td><td>boolean</td><td>false</td></tr><tr><td>connect.mqtt.ssl.ca.cert</td><td>Provides the path to the CA certificate file to use with the Mqtt connection</td><td>string</td><td></td></tr><tr><td>connect.mqtt.ssl.cert</td><td>Provides the path to the certificate file to use with the Mqtt connection</td><td>string</td><td></td></tr><tr><td>connect.mqtt.ssl.key</td><td>Certificate private [config] key file path.</td><td>string</td><td></td></tr></tbody></table>

## Migration <a href="#dynamic-targets" id="dynamic-targets"></a>

Version 9 introduces two **breaking changes** that affect how you route data and project fields in KCQL. Review the sections below and update your configurations before restarting the connector.

**WITHTARGET is no longer used**

| **Before (v < 9.0.0)**                                                     | **After (v ≥ 9.0.0)**                                                                                          |
| -------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------- |
| <p>INSERT INTO SELECT \* FROM control.boxes.test<br>WITHTARGET ${path}</p> | <p>INSERT INTO ${path}<br>SELECT \* FROM control.boxes.test<br>PROPERTIES('mqtt.target.from.field'='true')</p> |

Migration step

1. Delete every `WITHTARGET …` clause.
2. Move the placeholder (or literal) that held the target path into the `INSERT INTO` expression.
3. Add `mqtt.target.from.field=true` to the KCQL `PROPERTIES` list.

#### &#x20;KCQL field projections are ignored

| **Before (v < 9.0.0)**                                                                                         | **After (v ≥ 9.0.0)**                                                                                                                                                                                        |
| -------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| <p>Handled directly in the KCQL <code>SELECT</code> list:<br><code>SELECT id, temp AS temperature …</code></p> | The connector passes the full record. Any projection, renaming, or value transformation must be done with a **Kafka Connect Single Message Transformer (SMT)**, **KStreams**, or another preprocessing step. |

Migration step

* Remove field lists and aliases from KCQL.
* Attach an SMT such as `org.apache.kafka.connect.transforms.ExtractField$Value`, `org.apache.kafka.connect.transforms.MaskField$Value`, or your own custom SMT to perform the same logic.
