Cassandra
This page describes the usage of the Stream Reactor Cassandra Sink Connector.
The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.
Connector Class
Example
For more examples see the tutorials.
KCQL support
You can specify multiple KCQL statements separated by**;
** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Deletion in Cassandra
Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.
Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:
If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:
Deletion will only occur if a message with an empty payload is received from Kafka.
Ensure your ordinal position of the connect.cassandra.delete.struct_flds
matches the binding order in the Cassandra delete statement!
Kafka payload support
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
Error policies
The connector supports Error policies.
Option Reference
connect.cassandra.contact.points
Initial contact point host for Cassandra including port.
string
localhost
connect.cassandra.port
Cassandra native port.
int
9042
connect.cassandra.key.space
Keyspace to write to.
string
connect.cassandra.username
Username to connect to Cassandra with.
string
connect.cassandra.password
Password for the username to connect to Cassandra with.
password
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
boolean
false
connect.cassandra.trust.store.path
Path to the client Trust Store.
string
connect.cassandra.trust.store.password
Password for the client Trust Store.
password
connect.cassandra.trust.store.type
Type of the Trust Store, defaults to JKS
string
JKS
connect.cassandra.key.store.type
Type of the Key Store, defauts to JKS
string
JKS
connect.cassandra.ssl.client.cert.auth
Enable client certification authentication by Cassandra. Requires KeyStore options to be set.
boolean
false
connect.cassandra.key.store.path
Path to the client Key Store.
string
connect.cassandra.key.store.password
Password for the client Key Store
password
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.
string
connect.cassandra.fetch.size
The number of records the Cassandra driver will return at once.
int
5000
connect.cassandra.load.balancing.policy
Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN
string
TOKEN_AWARE
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
string
THROW
connect.cassandra.max.retries
The maximum number of times to try the write again.
int
20
connect.cassandra.retry.interval
The time in milliseconds between retries.
int
60000
connect.cassandra.threadpool.size
The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.
int
0
connect.cassandra.delete.struct_flds
Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.
list
[]
connect.cassandra.delete.statement
Delete statement for cassandra
string
connect.cassandra.kcql
KCQL expression describing field selection and routes.
string
connect.cassandra.default.value
By default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved.
string
connect.cassandra.delete.enabled
Enables row deletion from cassandra
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
Last updated