4.0

You are viewing documentation for an older version of Lenses.io View latest documentation here

RethinkDB

A Kafka Connector source connector to write events from ReThinkDB to Kafka.

KCQL support 

The following KCQL is supported:

INSERT INTO kafka_topic 
SELECT * 
FROM rethink_table 
[INITIALIZE] 
[BATCH ...]

Selection of fields from the Pulsar message is not supported.

Examples:

-- Insert into Kafka the change feed from tableA
INSERT INTO topicA SELECT * FROM tableA

-- Insert into topicA the change feed from tableA, read from start
INSERT INTO tableA SELECT * FROM topicA INITIALIZE

-- Insert into topicA the change feed from tableA, read from start, 
-- read from start and batch 100 rows to send to kafka
INSERT INTO tableA SELECT * FROM topicA INITIALIZE BATCH 100

Concepts 

RethinkDB supports a changefeed. This connector listens to the change feed for tables described in the KCQL table.

Initialization 

The connector can read from the start of the changefeed to bootstrap a Kafka topic. To initialize use the INITIALIZE clause in the KCQL statement.

Batching 

The connector can batch records when writing to a Kafka topic. To set the batching use the BATCH clause in the KCQL statement.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=rethinkdb
docker-compose up -d rethink

Inserting test data 

Go to the ReThink Admin console http://localhost:8080/#tables.

  • Add a table called lenses.
  • The Data Explorer tab insert the following and hit run to insert the record into the table.
r.table('lenses').insert([
    { name: "datamountaineers-rule", tv_show: "Battlestar Galactica",
      posts: [
        {title: "Decommissioning speech3", content: "The Cylon War is long over..."},
        {title: "We are at war", content: "Moments ago, this ship received word..."},
        {title: "The new Earth", content: "The discoveries of the past few days..."}
      ]
    }
])

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select RethinkDB as the source and paste the following:

name=rethink
connector.class=com.datamountaineer.streamreactor.connect.rethink.source.ReThinkSourceConnector
tasks.max=1
connect.rethink.host=rethink
connect.rethink.port=28015
connect.rethink.db=test
connect.rethink.kcql=INSERT INTO rethink SELECT * FROM lenses INITIALIZE

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create rethink < connector.properties

connect-cli create rethink < connector.properties

Wait a for the connector to start and check its running:

connect-cli status rethink

Check for records in Kafka 

Check the records in Lenses or with via the console:

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic rethink \
    --from-beginning

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.rethink.hostRethink server host.stringlocalhost
connect.rethink.dbThe reThink database to read from.stringconnect_rethink_sink
connect.rethink.portClient port of rethink server to connect to.int28015
connect.rethink.kcqlThe KCQL expression for the connector.string
connect.rethink.rethink.usernameThe user name to connect to rethink with.string
connect.rethink.passwordThe password for the user.password
connect.rethink.rethink.auth.keyThe authorization key to use in combination with the certificate file.password
connect.rethink.rethink.cert.fileCertificate file to use for secure TLS connection to the rethinkdb servers. Cannot be used with username/password.string
connect.rethink.linger.msThe number of milliseconds to wait before flushing the received messages to Kafka. The records willbe flushed if the batch size is reached before the linger period has expired.long5000
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse