Cassandra

This page describes the usage of the Stream Reactor Cassandra Source Connector.

Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.

Connector Class

io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector

Example

For more examples see the tutorials.

name=cassandra
connector.class=io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=cassandra

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

The following KCQL is supported:

INSERT INTO <your-topic>
SELECT FIELD,...
FROM <your-cassandra-table>
[PK FIELD]
[WITHFORMAT JSON]
[INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN|DSESEARCHTIMESTAMP]
[WITHKEY(<your-key-field>)]

Examples:

-- Select all columns from table orders and insert into a topic
-- called orders-topic, use column created to track new rows.
-- Incremental mode set to TIMEUUID
INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID

-- Select created, product, price from table orders and insert
-- into a topic called orders-topic, use column created to track new rows.
INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.

Keyed JSON Format

The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

INSERT INTO <topic>
SELECT <fields>
FROM <column_family>
PK <PK_field>
WITHFORMAT JSON
WITHUNWRAP INCREMENTALMODE=<mode>
WITHKEY(<key_field>)

Multiple key fields are supported using a delimiter:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.

Keying is only supported in conjunction with the WITHFORMAT JSON clause

Incremental mode

This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:

  1. TIMESTAMP

  2. TIMEUUID

  3. TOKEN

  4. DSESEARCHTIMESTAMP

If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.

You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.

DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.

INSERT INTO <topic>
SELECT a, b, c, d
FROM keyspace.table
WHERE solr_query= 'pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]}'
INCREMENTALMODE=DSESEARCHTIMESTAMP

Bulk Mode

The connector constantly loads the entire table.

Controlling throughput

The connector can be configured to:

  • Start from a particular offset - connect.cassandra.initial.offset

  • Increase or decrease the poll interval - connect.cassandra.import.poll.interval

  • Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration

For a more detailed explanation of how to use Cassandra to Kafka options.

Source Data Type Mapping

The following CQL data types are supported:

CQL TypeConnect Data Type

TimeUUID

Optional String

UUID

Optional String

Inet

Optional String

Ascii

Optional String

Text

Optional String

Timestamp

Optional String

Date

Optional String

Tuple

Optional String

UDT

Optional String

Boolean

Optional Boolean

TinyInt

Optional Int8

SmallInt

Optional Int16

Int

Optional Int32

Decimal

Optional String

Float

Optional Float32

Counter

Optional Int64

BigInt

Optional Int64

VarInt

Optional Int64

Double

Optional Int64

Time

Optional Int64

Blob

Optional Bytes

Map

Optional [String -> MAP]

List

Optional [String -> ARRAY]

Set

Optional [String -> ARRAY]

Option Reference

NameDescriptionTypeDefault Value

connect.cassandra.contact.points

Initial contact point host for Cassandra including port.

string

localhost

connect.cassandra.port

Cassandra native port.

int

9042

connect.cassandra.key.space

Keyspace to write to.

string

connect.cassandra.username

Username to connect to Cassandra with.

string

connect.cassandra.password

Password for the username to connect to Cassandra with.

password

connect.cassandra.ssl.enabled

Secure Cassandra driver connection via SSL.

boolean

false

connect.cassandra.trust.store.path

Path to the client Trust Store.

string

connect.cassandra.trust.store.password

Password for the client Trust Store.

password

connect.cassandra.trust.store.type

Type of the Trust Store, defaults to JKS

string

JKS

connect.cassandra.key.store.type

Type of the Key Store, defauts to JKS

string

JKS

connect.cassandra.ssl.client.cert.auth

Enable client certification authentication by Cassandra. Requires KeyStore options to be set.

boolean

false

connect.cassandra.key.store.path

Path to the client Key Store.

string

connect.cassandra.key.store.password

Password for the client Key Store

password

connect.cassandra.consistency.level

Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.

string

connect.cassandra.fetch.size

The number of records the Cassandra driver will return at once.

int

5000

connect.cassandra.load.balancing.policy

Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN

string

TOKEN_AWARE

connect.cassandra.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

string

THROW

connect.cassandra.max.retries

The maximum number of times to try the write again.

int

20

connect.cassandra.retry.interval

The time in milliseconds between retries.

int

60000

connect.cassandra.task.buffer.size

The size of the queue as read writes to.

int

10000

connect.cassandra.assigned.tables

The tables a task has been assigned.

string

connect.cassandra.batch.size

The number of records the source task should drain from the reader queue.

int

100

connect.cassandra.import.poll.interval

The polling interval between queries against tables for bulk mode.

long

1000

connect.cassandra.time.slice.ms

The range of time in milliseconds the source task the timestamp/timeuuid will use for query

long

10000

connect.cassandra.import.allow.filtering

Enable ALLOW FILTERING in incremental selects.

boolean

true

connect.cassandra.slice.duration

Duration to query for in target Cassandra table. Used to restrict query timestamp span

long

10000

connect.cassandra.slice.delay.ms

The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available

long

30000

connect.cassandra.initial.offset

The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z

string

1900-01-01 00:00:00.0000000Z

connect.cassandra.mapping.collection.to.json

Mapping columns with type Map, List and Set like json

boolean

true

connect.cassandra.kcql

KCQL expression describing field selection and routes.

string

\

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.