4.2

RethinkDB

Kafka Connect sink connector for writing data from Kafka to RethinkDB.

KCQL support 

The following KCQL is supported:

INSERT | UPSERT 
INTO <table_name> 
SELECT FIELD, ... 
FROM <kafka-topic> 
[AUTOCREATE] 
[PK FIELD, ...]

Concepts 

Autocreate 

The connector can autocreate a table in RethinkDb using the AUTOCREATE clause.

Primary Key 

When creating tables and inserting records the primary keys can be defined by the PK clause.

If none are specified a concatenation of the topic name, partition and offset are used when inserting record. When creating the table with no keys set the primary key field called id.

Insert mode 

The connector support an insert mode. This corresponds to RethinkDb’s ERROR conflict policy

Upsert mode 

The connector support an upsert mode. This corresponds to RethinkDb’s REPLACE conflict policy

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices .

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=rethinkdb
docker-compose up -d rethink

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select RethinkDB as the sink and paste the following:

name=rethink
connector.class=com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkConnector
tasks.max=1
topics=rethink-topic
connect.rethink.db=test
connect.rethink.host=rethink
connect.rethink.port=28015
connect.rethink.kcql=INSERT INTO lenses SELECT * FROM rethink

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create rethink < connector.properties

connect-cli create rethink < connector.properties

Wait a for the connector to start and check its running:

connect-cli status rethink

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
  --broker-list localhost:9092 --topic rethink \
  --property value.schema='{"type":"record","name":"User","namespace":"com.datamountaineer.streamreactor.connect.rethink"
  ,"fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'

the console is now waiting for your input, enter the following:


{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}

Check for data in RethinkDB 

Go to the ReThink Admin console http://localhost:8080/#tables.

  • The Data Explorer tab insert the following and hit run.
r.table('lenses')

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.rethink.hostRethink server host.stringlocalhost
connect.rethink.dbThe reThink database to read from.stringconnect_rethink_sink
connect.rethink.portClient port of rethink server to connect to.int28015
connect.rethink.kcqlThe KCQL expression for the connector.string
connect.rethink.rethink.usernameThe user name to connect to rethink with.string
connect.rethink.passwordThe password for the user.password
connect.rethink.rethink.auth.keyThe authorization key to use in combination with the certificate file.password
connect.rethink.rethink.cert.fileCertificate file to use for secure TLS connection to the rethinkdb servers. Cannot be used with username/password.string
connect.rethink.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.rethink.retry.intervalThe time in milliseconds between retries.int60000
connect.rethink.max.retriesThe maximum number of times to try the write again.int20
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse