# InfluxDB

{% hint style="info" %}

## This connector has been retired starting version 11.0.0

{% endhint %}

## Connector Class

```
io.lenses.streamreactor.connect.influx.InfluxSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](/latest/connectors/tutorials.md).
{% endhint %}

{% code fullWidth="true" %}

```bash
name=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by **`;`** to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]
```

Examples:

```sql
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y

-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()

-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)

-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)

-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)
```

## Tags <a href="#tags" id="tags"></a>

InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.

{% hint style="info" %}
Only applicable to value fields. No support for nested fields, keys or topic metadata.
{% endhint %}

## Kafka payload support <a href="#kafka-payload-support" id="kafka-payload-support"></a>

This sink supports the following Kafka payloads:

* Schema.Struct and Struct (Avro)
* Schema.Struct and JSON
* No Schema and JSON

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](/latest/connectors/tutorials/using-error-policies.md).

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

<table data-full-width="false"><thead><tr><th width="288">Name</th><th width="266.5">Description</th><th width="108">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.influx.url</td><td>The InfluxDB database url.</td><td>string</td><td></td></tr><tr><td>connect.influx.db</td><td>The database to store the values to.</td><td>string</td><td></td></tr><tr><td>connect.influx.username</td><td>The user to connect to the influx database</td><td>string</td><td></td></tr><tr><td>connect.influx.password</td><td>The password for the influxdb user.</td><td>password</td><td></td></tr><tr><td>connect.influx.kcql</td><td>KCQL expression describing field selection and target measurements.</td><td>string</td><td></td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed by the connector</td><td>boolean</td><td>false</td></tr><tr><td>connect.influx.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically</td><td>string</td><td>THROW</td></tr><tr><td>connect.influx.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.influx.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.influx.retention.policy</td><td>Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is <code>autogen</code> from 1.0 onwards or <code>default</code> for any previous version</td><td>string</td><td>autogen</td></tr><tr><td>connect.influx.consistency.level</td><td>Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.</td><td>string</td><td>ALL</td></tr></tbody></table>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/influxdb.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
