VoltDB

Kafka Connect sink connector for writing data from Kafka to VoltDB.

KCQL support 

The following KCQL is supported:

INSERT | UPSERT 
INTO <table_name> 
SELECT FIELD, ... 
FROM <kafka-topic> 

Examples:

-- Insert mode, select all fields from topicA and write to tableA
INSERT INTO tableA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB and write to tableB
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB

-- Upsert mode, select 3 fields write to tableB
UPSERT INTO tableB SELECT x AS a, y, z FROM topicB

Concepts 

Insert Mode 

Insert is the default write mode of the sink.

Kafka currently can provide exactly once delivery semantics, however to ensure no errors are produced if unique constraints have been implemented on the target tables, the sink can run in UPSERT mode. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.

Upsert Mode 

The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=volt
docker-compose up -d voltdb

Preparing the target system 

Login into the VoltDB container and start the VoltDb shell:

docker exec -ti voltdb sqlcmd

CREATE TABLE person(
    firstname VARCHAR(128)
    , lastname VARCHAR(128)
    , age INT
    , salary FLOAT, 
    PRIMARY KEY (firstname, lastname)
);

SELECT * FROM person;

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select VoltDB as the sink and paste the following:

name=voltdb
connector.class=com.datamountaineer.streamreactor.connect.voltdb.VoltSinkConnector
tasks.max=1
topics=voltdb
connect.volt.servers=voltdb:21212
connect.volt.password=
connect.volt.username=
connect.volt.kcql=INSERT INTO person SELECT * FROM voltdb

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create voltdb < connector.properties

connect-cli create voltdb < connector.properties

Wait a for the connector to start and check its running:

connect-cli status voltdb

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
  --broker-list localhost:9092 --topic rethink \
  --property value.schema='{"type":"record","name":"User","namespace":"com.datamountaineer.streamreactor.connect.rethink"
  ,"fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'

the console is now waiting for your input, enter the following:


{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}

Check for data in VoltDB 

In the VoltDB container run:

SELECT * FROM person;

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.volt.retry.intervalThe time in milliseconds between retriesint60000
connect.volt.serversComma separated server[:port]string
connect.volt.usernameThe user to connect to the volt databasestring
connect.volt.passwordThe password for the voltdb userpassword
connect.volt.kcqlKCQL expression describing field selection and routesstring
connect.volt.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.volt.max.retriesThe maximum number of times to try the write againint20
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse