# MQTT

A Kafka Connect source connector to read events from MQTT and push them to Kafka.

## Connector Class

```
io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
```

## Example

{% hint style="success" %}
For more examples, see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="true" %}

```properties
name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.client.id=dm_source_id
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.service.quality=1
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by `;`&#x20;

However, you can not route the same source to different topics, for this use a separate connector instance.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-mqtt-topic>
[WITHCONVERTER=`myclass`]
```

The selection of fields from the JMS message is **not** supported.

Examples:

```sql
-- Insert mode, select all fields from topicA
-- and write to topic topic with converter myclass
INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]

-- wildcard
INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]
```

## Keyed JSON format <a href="#keyed-json-format" id="keyed-json-format"></a>

To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data, the connector supports **WITHKEY** in the KCQL syntax.

Multiple key fields are supported using a delimiter:

```bash
// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']
```

The resulting Kafka record key content will be the string concatenation of the values of the fields. Optionally, the delimiter can be set via the KEYDELIMITER keyword.

## Shared and Wildcard Subscriptions <a href="#shared-and-wildcard-subscriptions" id="shared-and-wildcard-subscriptions"></a>

The connector supports wildcard and shared subscriptions, but the KCQL command must be placed inside single quotes.

```sql
-- wildcard
INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA/+/sensors WITHCONVERTER=`myclass`
```

## Dynamic target topics <a href="#message-converters" id="message-converters"></a>

When using wildcard subscriptions, you can dynamically route messages to a Kafka topic with the same name as the MQTT topic by using `` `$` ``in the KCQL target statement.

{% hint style="info" %}
You must use a wildcard or a shared subscription format.&#x20;

Kafka does not support `/` . The result topic names will have/replaced by `_`. For example:

`/mqttSourceTopic/A/test` would become `mqttSourceTopic_A_test`.
{% endhint %}

```sql
INSERT INTO `$` SELECT * FROM /mqttTopicA/+/sensors
```

## Message converters <a href="#message-converters" id="message-converters"></a>

The connector supports converters to handle different message payload formats in the source topic. See [source record converters](https://docs.lenses.io/latest/connectors/tutorials/source-converters-with-incoming-json-or-avro).

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

<table data-full-width="true"><thead><tr><th width="338">Name</th><th width="441">Description</th><th width="138">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.mqtt.hosts</td><td>Contains the MQTT connection end points.</td><td>string</td><td></td></tr><tr><td>connect.mqtt.username</td><td>Contains the Mqtt connection user name</td><td>string</td><td></td></tr><tr><td>connect.mqtt.password</td><td>Contains the Mqtt connection password</td><td>password</td><td></td></tr><tr><td>connect.mqtt.service.quality</td><td>Specifies the Mqtt quality of service</td><td>int</td><td></td></tr><tr><td>connect.mqtt.timeout</td><td>Provides the time interval to establish the mqtt connection</td><td>int</td><td>3000</td></tr><tr><td>connect.mqtt.clean</td><td>connect.mqtt.clean</td><td>boolean</td><td>true</td></tr><tr><td>connect.mqtt.keep.alive</td><td>The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.</td><td>int</td><td>5000</td></tr><tr><td>connect.mqtt.client.id</td><td>Contains the Mqtt session client id</td><td>string</td><td></td></tr><tr><td>connect.mqtt.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically</td><td>string</td><td>THROW</td></tr><tr><td>connect.mqtt.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.mqtt.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.mqtt.retained.messages</td><td>Specifies the Mqtt retained flag.</td><td>boolean</td><td>false</td></tr><tr><td>connect.mqtt.converter.throw.on.error</td><td>If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.</td><td>boolean</td><td>false</td></tr><tr><td>connect.converter.avro.schemas</td><td>If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter</td><td>string</td><td></td></tr><tr><td>connect.mqtt.log.message</td><td>Logs received MQTT messages</td><td>boolean</td><td>false</td></tr><tr><td>connect.mqtt.kcql</td><td>Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics</td><td>string</td><td></td></tr><tr><td>connect.mqtt.polling.timeout</td><td>Provides the timeout to poll incoming messages</td><td>int</td><td>1000</td></tr><tr><td>connect.mqtt.share.replicate</td><td>Replicate the shared subscriptions to all tasks instead of distributing them</td><td>boolean</td><td>false</td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed</td><td>boolean</td><td>false</td></tr><tr><td>connect.mqtt.ssl.ca.cert</td><td>Provides the path to the CA certificate file to use with the Mqtt connection</td><td>string</td><td></td></tr><tr><td>connect.mqtt.ssl.cert</td><td>Provides the path to the certificate file to use with the Mqtt connection</td><td>string</td><td></td></tr><tr><td>connect.mqtt.ssl.key</td><td>Certificate private [config] key file path.</td><td>string</td><td></td></tr><tr><td>connect.mqtt.process.duplicates</td><td>Process duplicate messages</td><td>boolean</td><td>false</td></tr></tbody></table>
