LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Connector Class
  • Example
  • KCQL support
  • Deletion in Cassandra
  • Kafka payload support
  • Error policies
  • Option Reference

Was this helpful?

Export as PDF
  1. Connectors
  2. Sinks

Cassandra

This page describes the usage of the Stream Reactor Cassandra Sink Connector.

PreviousAzure Service BusNextElasticsearch

Last updated 7 months ago

Was this helpful?

The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.

Connector Class

io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector

Example

For more examples see the .

name=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=cassandra

KCQL support

You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

INSERT INTO <your-cassandra-table>
SELECT FIELD,...
FROM <your-table>
[TTL=Time to live]

Examples:

-- Insert mode, select all fields from topicA and
-- write to tableA
INSERT INTO tableA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB
INSERT INTO tableB SELECT x AS a, y, c FROM topicB

-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB with TTL
INSERT INTO tableB SELECT x, y FROM topicB TTL=100000

Deletion in Cassandra

Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.

Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:

DELETE FROM orders WHERE id = ? and product = ?

If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:

# Message
# "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
# DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"

# connect.cassandra.delete.enabled=true
# connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
# connect.cassandra.delete.struct_flds=id,product

Deletion will only occur if a message with an empty payload is received from Kafka.

Ensure your ordinal position of the connect.cassandra.delete.struct_flds matches the binding order in the Cassandra delete statement!

Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

Error policies

Option Reference

Name
Description
Type
Default Value

connect.cassandra.contact.points

Initial contact point host for Cassandra including port.

string

localhost

connect.cassandra.port

Cassandra native port.

int

9042

connect.cassandra.key.space

Keyspace to write to.

string

connect.cassandra.username

Username to connect to Cassandra with.

string

connect.cassandra.password

Password for the username to connect to Cassandra with.

password

connect.cassandra.ssl.enabled

Secure Cassandra driver connection via SSL.

boolean

false

connect.cassandra.trust.store.path

Path to the client Trust Store.

string

connect.cassandra.trust.store.password

Password for the client Trust Store.

password

connect.cassandra.trust.store.type

Type of the Trust Store, defaults to JKS

string

JKS

connect.cassandra.key.store.type

Type of the Key Store, defauts to JKS

string

JKS

connect.cassandra.ssl.client.cert.auth

Enable client certification authentication by Cassandra. Requires KeyStore options to be set.

boolean

false

connect.cassandra.key.store.path

Path to the client Key Store.

string

connect.cassandra.key.store.password

Password for the client Key Store

password

connect.cassandra.consistency.level

Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.

string

connect.cassandra.fetch.size

The number of records the Cassandra driver will return at once.

int

5000

connect.cassandra.load.balancing.policy

Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN

string

TOKEN_AWARE

connect.cassandra.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

string

THROW

connect.cassandra.max.retries

The maximum number of times to try the write again.

int

20

connect.cassandra.retry.interval

The time in milliseconds between retries.

int

60000

connect.cassandra.threadpool.size

The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.

int

0

connect.cassandra.delete.struct_flds

Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.

list

[]

connect.cassandra.delete.statement

Delete statement for cassandra

string

connect.cassandra.kcql

KCQL expression describing field selection and routes.

string

connect.cassandra.default.value

By default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved.

string

connect.cassandra.delete.enabled

Enables row deletion from cassandra

boolean

false

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

The connector supports .

tutorials
Error policies