InfluxDB

Kafka Connect sink connector for writing data from Kafka to InfluxDB.

KCQL support 

The following KCQL is supported:

INSERT INTO <your-measure>
SELECT FIELD, ... 
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]


Examples:

-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to indexB, 
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y

-- Insert mode, select 3 fields and rename from topicB and write to indexB, 
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()

-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)

-- Tagging using fields in the payload. Say we have a Payment structure 
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)

-- Tagging using a combination of fields in the payload and constants. 
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)

Concepts 

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Tags 

InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=influx
docker-compose up -d influxdb

Peparing the target system 

Login into the influx container:

docker exec -ti influxdb influx

execute the following:

CREATE DATABASE mydb

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select InfluxDB as the sink and paste the following:

name=influxdb
connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create influxdb < connector.properties

connect-cli create influxdb < connector.properties

Wait a for the connector to start and check its running:

connect-cli status influxdb

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
  --broker-list localhost:9092 \
  --topic influx \
  --property value.schema='{"type":"record","name":"User",
  "fields":[{"name":"company","type":"string"},{"name":"address","type":"string"},{"name":"latitude","type":"float"},{"name":"longitude","type":"float"}]}'

the console is now waiting for your input, enter the following:


{"company": "DataMountaineer","address": "MontainTop","latitude": -49.817964,"longitude": -141.645812}

Check for data in Hive 

In the Influx container run:

USE mydb
SELECT * FROM influxMeasure

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.influx.urlThe InfluxDB database url.string
connect.influx.dbThe database to store the values to.string
connect.influx.usernameThe user to connect to the influx databasestring
connect.influx.passwordThe password for the influxdb user.password
connect.influx.kcqlKCQL expression describing field selection and target measurements.string
connect.progress.enabledEnables the output for how many records have been processed by the connectorbooleanfalse
connect.influx.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.influx.retry.intervalThe time in milliseconds between retries.int60000
connect.influx.max.retriesThe maximum number of times to try the write again.int20
connect.influx.retention.policyDetermines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen from 1.0 onwards or default for any previous versionstringautogen
connect.influx.consistency.levelSpecifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.stringALL