InfluxDB
This page describes the usage of the Stream Reactor InfluxDB Sink Connector.
Connector Class
io.lenses.streamreactor.connect.influx.InfluxSinkConnector
Example
For more examples see the tutorials.
name=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
KCQL support
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]
Examples:
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)
Tags
InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.
Kafka payload support
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
Error policies
The connector supports Error policies.
Option Reference
connect.influx.url
The InfluxDB database url.
string
connect.influx.db
The database to store the values to.
string
connect.influx.username
The user to connect to the influx database
string
connect.influx.password
The password for the influxdb user.
password
connect.influx.kcql
KCQL expression describing field selection and target measurements.
string
connect.progress.enabled
Enables the output for how many records have been processed by the connector
boolean
false
connect.influx.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.influx.retry.interval
The time in milliseconds between retries.
int
60000
connect.influx.max.retries
The maximum number of times to try the write again.
int
20
connect.influx.retention.policy
Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen
from 1.0 onwards or default
for any previous version
string
autogen
connect.influx.consistency.level
Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.
string
ALL
Last updated
Was this helpful?