Cassandra
This page describes the usage of the Stream Reactor Cassandra Source Connector.
Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
Connector Class
Example
For more examples see the tutorials.
KCQL support
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
Examples:
Keyed JSON Format
The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
Keying is only supported in conjunction with the WITHFORMAT JSON clause
Incremental mode
This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:
TIMESTAMP
TIMEUUID
TOKEN
DSESEARCHTIMESTAMP
If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.
You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.
DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.
Bulk Mode
The connector constantly loads the entire table.
Controlling throughput
The connector can be configured to:
Start from a particular offset -
connect.cassandra.initial.offset
Increase or decrease the poll interval -
connect.cassandra.import.poll.interval
Set a slice duration to query for in milliseconds -
connect.cassandra.slice.duration
For a more detailed explanation of how to use Cassandra to Kafka options.
Source Data Type Mapping
The following CQL data types are supported:
TimeUUID
Optional String
UUID
Optional String
Inet
Optional String
Ascii
Optional String
Text
Optional String
Timestamp
Optional String
Date
Optional String
Tuple
Optional String
UDT
Optional String
Boolean
Optional Boolean
TinyInt
Optional Int8
SmallInt
Optional Int16
Int
Optional Int32
Decimal
Optional String
Float
Optional Float32
Counter
Optional Int64
BigInt
Optional Int64
VarInt
Optional Int64
Double
Optional Int64
Time
Optional Int64
Blob
Optional Bytes
Map
Optional [String -> MAP]
List
Optional [String -> ARRAY]
Set
Optional [String -> ARRAY]
Option Reference
connect.cassandra.contact.points
Initial contact point host for Cassandra including port.
string
localhost
connect.cassandra.port
Cassandra native port.
int
9042
connect.cassandra.key.space
Keyspace to write to.
string
connect.cassandra.username
Username to connect to Cassandra with.
string
connect.cassandra.password
Password for the username to connect to Cassandra with.
password
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
boolean
false
connect.cassandra.trust.store.path
Path to the client Trust Store.
string
connect.cassandra.trust.store.password
Password for the client Trust Store.
password
connect.cassandra.trust.store.type
Type of the Trust Store, defaults to JKS
string
JKS
connect.cassandra.key.store.type
Type of the Key Store, defauts to JKS
string
JKS
connect.cassandra.ssl.client.cert.auth
Enable client certification authentication by Cassandra. Requires KeyStore options to be set.
boolean
false
connect.cassandra.key.store.path
Path to the client Key Store.
string
connect.cassandra.key.store.password
Password for the client Key Store
password
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.
string
connect.cassandra.fetch.size
The number of records the Cassandra driver will return at once.
int
5000
connect.cassandra.load.balancing.policy
Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN
string
TOKEN_AWARE
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
string
THROW
connect.cassandra.max.retries
The maximum number of times to try the write again.
int
20
connect.cassandra.retry.interval
The time in milliseconds between retries.
int
60000
connect.cassandra.task.buffer.size
The size of the queue as read writes to.
int
10000
connect.cassandra.assigned.tables
The tables a task has been assigned.
string
connect.cassandra.batch.size
The number of records the source task should drain from the reader queue.
int
100
connect.cassandra.import.poll.interval
The polling interval between queries against tables for bulk mode.
long
1000
connect.cassandra.time.slice.ms
The range of time in milliseconds the source task the timestamp/timeuuid will use for query
long
10000
connect.cassandra.import.allow.filtering
Enable ALLOW FILTERING in incremental selects.
boolean
true
connect.cassandra.slice.duration
Duration to query for in target Cassandra table. Used to restrict query timestamp span
long
10000
connect.cassandra.slice.delay.ms
The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available
long
30000
connect.cassandra.initial.offset
The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z
string
1900-01-01 00:00:00.0000000Z
connect.cassandra.mapping.collection.to.json
Mapping columns with type Map, List and Set like json
boolean
true
connect.cassandra.kcql
KCQL expression describing field selection and routes.
string
\
Last updated