# InfluxDB

{% hint style="info" %}

## This connector has been retired starting version 11.0.0

{% endhint %}

## Connector Class

```
io.lenses.streamreactor.connect.influx.InfluxSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="true" %}

```bash
name=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by **`;`** to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]
```

Examples:

```sql
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y

-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()

-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)

-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)

-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)
```

## Tags <a href="#tags" id="tags"></a>

InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.

{% hint style="info" %}
Only applicable to value fields. No support for nested fields, keys or topic metadata.
{% endhint %}

## Kafka payload support <a href="#kafka-payload-support" id="kafka-payload-support"></a>

This sink supports the following Kafka payloads:

* Schema.Struct and Struct (Avro)
* Schema.Struct and JSON
* No Schema and JSON

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](https://docs.lenses.io/latest/connectors/tutorials/using-error-policies).

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

<table data-full-width="false"><thead><tr><th width="288">Name</th><th width="266.5">Description</th><th width="108">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.influx.url</td><td>The InfluxDB database url.</td><td>string</td><td></td></tr><tr><td>connect.influx.db</td><td>The database to store the values to.</td><td>string</td><td></td></tr><tr><td>connect.influx.username</td><td>The user to connect to the influx database</td><td>string</td><td></td></tr><tr><td>connect.influx.password</td><td>The password for the influxdb user.</td><td>password</td><td></td></tr><tr><td>connect.influx.kcql</td><td>KCQL expression describing field selection and target measurements.</td><td>string</td><td></td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed by the connector</td><td>boolean</td><td>false</td></tr><tr><td>connect.influx.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically</td><td>string</td><td>THROW</td></tr><tr><td>connect.influx.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.influx.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.influx.retention.policy</td><td>Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is <code>autogen</code> from 1.0 onwards or <code>default</code> for any previous version</td><td>string</td><td>autogen</td></tr><tr><td>connect.influx.consistency.level</td><td>Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.</td><td>string</td><td>ALL</td></tr></tbody></table>
