InfluxDB

This page describes the usage of the Stream Reactor InfluxDB Sink Connector.

Connector Class

io.lenses.streamreactor.connect.influx.InfluxSinkConnector

Example

For more examples see the tutorials.

name=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()

KCQL support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]

Examples:

-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y

-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()

-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)

-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)

-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)

Tags

InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.

Only applicable to value fields. No support for nested fields, keys or topic metadata.

Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

Error policies

The connector supports Error policies.

Option Reference

NameDescriptionTypeDefault Value

connect.influx.url

The InfluxDB database url.

string

connect.influx.db

The database to store the values to.

string

connect.influx.username

The user to connect to the influx database

string

connect.influx.password

The password for the influxdb user.

password

connect.influx.kcql

KCQL expression describing field selection and target measurements.

string

connect.progress.enabled

Enables the output for how many records have been processed by the connector

boolean

false

connect.influx.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.influx.retry.interval

The time in milliseconds between retries.

int

60000

connect.influx.max.retries

The maximum number of times to try the write again.

int

20

connect.influx.retention.policy

Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen from 1.0 onwards or default for any previous version

string

autogen

connect.influx.consistency.level

Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.

string

ALL

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.