Cassandra


Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.

KCQL support 

The following KCQL is supported:

INSERT INTO <your-topic>
SELECT FIELD,...
FROM <your-cassandra-table>
[PK FIELD]
[WITHFORMAT JSON]
[INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN|DSESEARCHTIMESTAMP]
[WITHKEY(<your-key-field>)]

Examples:

-- Select all columns from table orders and insert into a topic
-- called orders-topic, use column created to track new rows.
-- Incremental mode set to TIMEUUID
INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID

-- Select created, product, price from table orders and insert
-- into a topic called orders-topic, use column created to track new rows.
INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.

Concepts 

The connector can load data in two ways:

  1. Bulk - The connector constantly loads the entrie table.
  2. Incremental - The connector performs and inital bulk load then incremental queries for new records.

This behavior is determined by the mode clause on the KCQL statement:

Keyed JSON Format 

The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

INSERT INTO <topic>
SELECT <fields>
FROM <column_family>
PK <PK_field>
WITHFORMAT JSON
WITHUNWRAP INCREMENTALMODE=<mode>
WITHKEY(<key_field>)

Multiple keys fields are supported using a delimiter:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.

Incremental mode 

This mode tracks new records added to a table. The columns to track is identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:

  1. TIMESTAMP
  2. TIMEUUID
  3. TOKEN
  4. DSESEARCHTIMESTAMP

If set to TOKEN this column value is wrapped inside Cassandras token function which needs unwrapping with the WITHUNWRAP command.

DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.

INSERT INTO <topic>
SELECT a, b, c, d
FROM keyspace.table
WHERE solr_query= 'pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]}'
INCREMENTALMODE=DSESEARCHTIMESTAMP

Controlling throughput 

The connector can be configured to:

  • Start from a particular offset - connect.cassandra.initial.offset
  • Increase or decrease the poll interval - connect.cassandra.import.poll.interval
  • Set a slice duration to query for in milliseconds - ``connect.cassandra.slice.duration`

For a more detailed explanation on how to use Cassandra to Kafka options.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=cassandra
docker-compose up -d cassandra

Inserting test data 

Login into your container:


docker exec -ti cassandra cqlsh

and and create the following keyspace, table and insert test data:


CREATE KEYSPACE demo
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

USE demo;

CREATE TABLE orders (
    id int
    , created timeuuid
    , product text
    , qty int
    , price float
    , PRIMARY KEY (id, created))
WITH CLUSTERING ORDER BY (created asc);

INSERT INTO orders (id, created, product, qty, price)
VALUES (1, now(), 'OP-DAX-P-20150201-95.7', 100, 94.2);

INSERT INTO orders (id, created, product, qty, price)
VALUES (2, now(), 'OP-DAX-C-20150201-100', 100, 99.5);

INSERT INTO orders (id, created, product, qty, price)
VALUES (3, now(), 'FU-KOSPI-C-20150201-100', 200, 150);

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Cassandra as the source and paste the following:

name=cassandra
connector.class=io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=cassandra

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create cassandra-source < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status cassandra-source

Check for records in Kafka 

Check the records in Lenses or with via the console:

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic orders-topic \
    --from-beginning

Source Data Type Mapping 

The following CQL data types are supported:

CQL TypeConnect Data Type
TimeUUIDOptional String
UUIDOptional String
InetOptional String
AsciiOptional String
TextOptional String
TimestampOptional String
DateOptional String
TupleOptional String
UDTOptional String
BooleanOptional Boolean
TinyIntOptional Int8
SmallIntOptional Int16
IntOptional Int32
DecimalOptional String
FloatOptional Float32
CounterOptional Int64
BigIntOptional Int64
VarIntOptional Int64
DoubleOptional Int64
TimeOptional Int64
BlobOptional Bytes
MapOptional [String -> MAP]
ListOptional [String -> ARRAY]
SetOptional [String -> ARRAY]

Options 

NameDescriptionTypeDefault Value
connect.cassandra.contact.pointsInitial contact point host for Cassandra including port.stringlocalhost
connect.cassandra.portCassandra native port.int9042
connect.cassandra.key.spaceKeyspace to write to.string
connect.cassandra.usernameUsername to connect to Cassandra with.string
connect.cassandra.passwordPassword for the username to connect to Cassandra with.password
connect.cassandra.ssl.enabledSecure Cassandra driver connection via SSL.booleanfalse
connect.cassandra.trust.store.pathPath to the client Trust Store.string
connect.cassandra.trust.store.passwordPassword for the client Trust Store.password
connect.cassandra.trust.store.typeType of the Trust Store, defaults to JKSstringJKS
connect.cassandra.key.store.typeType of the Key Store, defauts to JKSstringJKS
connect.cassandra.ssl.client.cert.authEnable client certification authentication by Cassandra. Requires KeyStore options to be set.booleanfalse
connect.cassandra.key.store.pathPath to the client Key Store.string
connect.cassandra.key.store.passwordPassword for the client Key Storepassword
connect.cassandra.consistency.levelConsistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.string
connect.cassandra.fetch.sizeThe number of records the Cassandra driver will return at once.int5000
connect.cassandra.load.balancing.policyCassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBINstringTOKEN_AWARE
connect.cassandra.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.stringTHROW
connect.cassandra.max.retriesThe maximum number of times to try the write again.int20
connect.cassandra.retry.intervalThe time in milliseconds between retries.int60000
connect.cassandra.task.buffer.sizeThe size of the queue as read writes to.int10000
connect.cassandra.assigned.tablesThe tables a task has been assigned.string
connect.cassandra.batch.sizeThe number of records the source task should drain from the reader queue.int100
connect.cassandra.import.poll.intervalThe polling interval between queries against tables for bulk mode.long1000
connect.cassandra.time.slice.msThe range of time in milliseconds the source task the timestamp/timeuuid will use for querylong10000
connect.cassandra.import.allow.filteringEnable ALLOW FILTERING in incremental selects.booleantrue
connect.cassandra.slice.durationDuration to query for in target Cassandra table. Used to restrict query timestamp spanlong10000
connect.cassandra.slice.delay.msThe delay between the current time and the time range of the query. Used to insure all of the data in the time slice is availablelong30000
connect.cassandra.initial.offsetThe initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Zstring1900-01-01 00:00:00.0000000Z
connect.cassandra.mapping.collection.to.jsonMapping columns with type Map, List and Set like jsonbooleantrue
connect.cassandra.kcqlKCQL expression describing field selection and routes.string
--
Last modified: September 15, 2024