Cassandra
This page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.
Stream Reactor Cassandra Sink Connector is designed to move data from Apache Kafka to Apache Cassandra . It supports Apache Cssandra 3.0 and later, DataStax Enterprise 4.7 and later, and DataStax Astra databases.
Connector Class
io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
Features
Database authentication : User-password, LDAP, Kerberos
Input formats: The connector supports Avro, Json Schema, Protobuf, Json (schemaless) and primitive data formats. For Schema-Registry based formats, like AVRO, Protobuf, Json-schema, it needs the the Scheam Registry settings.
At least once semantics : The Connect framework stores record offsets in Kafka and resumes from the last committed offset on restart. This ensures reliable delivery but may occasionally result in duplicate record processing during failures
Kafka topic to database's table mapping: It allows the control of which Kafka message fields are written to the database table columns
Multi-table mapping: Enables writing a topic to multiple database tables
Date/Time/Timestamp formats: Allows control of the date, time, and timestamp format conversion between Kafka messages and database columns, supporting custom parsing and formatting patterns.
Consistency Level control: Allows configuring the write consistency on a per table mapping
Row Level Time-to-Live: Allows configuring the row-level TTL on a per table mapping
Deletes: Allows configuring the deletes on a per table mapping
Example
For more examples see the tutorials.
name=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
connect.cassandra.port=9042
connect.cassandra.contact.points=cassandra
Connection
The connector requires a connection to the database. To enable it, specify the following configuration entries. See Configuration Reference for details.
connect.cassandra.contact.points=[host list]
connnect.cassandra.port=9042
connect.cassandra.load.balancing.local.dc=datacentre-name
# optional entries
# connect.cassandra.max.concurrent.requests = 100
# connect.cassandra.max.batch.size = 64
# connect.cassandra.connection.pool.size = 4
# connect.cassandra.query.timeout.ms = 30
# connect.cassandra.compression = None
Security
SSL connection
When the database cluster has enabled client encryption, configure the SSL Keys and certificates:
connect.cassandra.ssl.provider=
connect.cassandra.ssl.cipher.suites=
connect.cassandra.ssl.hostname.verification=true
connect.cassandra.ssl.keystore.path=
connect.cassandra.ssl.keystore.password=
connect.cassandra.ssl.truststore.password=
connect.cassandra.ssl.truststore.path=
# Path to the SSL certificate file, when using OpenSSL.
connect.cassandra.ssl.openssl.key.cert.chain=
# Path to the private key file, when using OpenSSL.
connect.cassandra.ssl.openssl.private.key=
User-Password or LDAP authentication
When the database is configured with user-password of LDAP authentication, configure the following:
connect.cassandra.auth.provider=
connect.cassandra.auth.username=
connect.cassandra.auth.password=
# for SASL authentication which requires connect.cassandra.auth.provider
# set to GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=
Kerberos authentication
When the database cluster has Kerberos authentication enabled set the following settings:
connect.cassandra.auth.provider=GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=
KCQL support
You can specify multiple KCQL statements separated by**;
** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
KCQL (Kafka Connect Query Language) is a SQL-like syntax that provides a declarative way to configure connector behavior. It serves as the primary interface for defining how data flows from source topics to target Cassandra tables.
Purpose and Functionality
KCQL statements define three core aspects of data pipeline configuration:
Data Source and Target: Specifies which Kafka topic serves as the data source and which Cassandra table receives the data.
Field Mapping: Controls how fields from the source topic map to columns in the target table, including transformations and selective field inclusion.
Operational Parameters: Configures advanced settings such as consistency levels, delete operations, time-to-live (TTL) settings, and timestamp handling.
Key Capabilities
Flexible Field Mapping: Map source fields to target columns with optional transformations
Consistency Control: Set read and write consistency levels for Cassandra operations
Delete Operations: Enable or disable delete functionality based on message content
TTL Management: Configure automatic data expiration using time-to-live settings
Timestamp Handling: Control how timestamps are processed and stored
Conditional Logic: Apply filtering and conditional processing to incoming data
Syntax Template
The basic KCQL statement follows this structure:
INSERT INTO <keyspace><your-cassandra-table>
SELECT <field projection,...
FROM <your-table>
PROPERTIES(< a set of keys to control the connector behaviour>)
INSERT INTO: Specifies the target Cassandra keyspace and table where data will be written.
SELECT: Defines field projection and mapping from the source.
FROM: Identifies the source Kafka topic containing the data to be processed.
PROPERTIES: Optional clause for configuring connector behavior such as consistency levels, TTL settings, and operational parameters.
Examples
INSERT INTO mykeyspace.types
SELECT
_key.bigint as bigintcol
, _value.boolean as booleancol
, _key.double as doublecol
, _value.float as floatcol
, _header.int as intcol
FROM myTopic
INSERT INTO mykespace.types
SELECT
_value.bigint as bigintcol
, _value.double as doublecol
, _value.ttlcol as message_internal_ttl
, _value.timestampcol as message_internal_timestamp
FROM myTopic
PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
INSERT INTO mykeyspace.CASE_SENSITIVE
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM mytopic
INSERT INTO mykeyspace.tableA
SELECT
key.my_pk as my_pk
, _value.my_value as my_value
FROM topicA
PROPERTIES(
'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
'deletesEnabled' ='true'
)
Table mapping
Kafka to Database Table Mapping
The connector enables mapping fields from Kafka records to database table columns. A single connector can handle multiple topics, where each topic can map to one or more tables. Kafka messages consist of a Key, Value, and Header.
Generic Mapping Syntax
Use the following syntax to map Kafka message fields to database columns:
INSERT INTO ${target}
SELECT _value/_key/_header.{field_path} AS ${table_column}
FROM mytopic
Prefix the field with:
_value
: For fields from the Kafka message's Value component._key
: For fields from the Kafka message's Key component._header
: For fields from the Kafka message's Header.
Note: If no prefix is provided, _value
is assumed, mapping to the Kafka record's Value component.
The record Key and Value can be mapped to specific columns like this, considering a table with row_key and content columns:
INSERT INTO ${target}
SELECT
_key as row_key
, _value as content
To map database columns with whitespace to KCQL projections, follow this format:
INSERT INTO ${target}
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM myTopic
Fan-out
You can map a topic's data to multiple database tables using multiple KCQL statements in the same connect.cassandra.kcql
configuration. For example:
INSERT INTO stocks_keyspace.stocks_by_symbol
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;
INSERT INTO stocks_keyspace.stocks_by_exchange
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;
User defined types
The connector can map complex types from a Kafka record payload into a user-defined type (UDT) column in the database. The incoming data field names must match the database field names.
Consider the following Kafka record format:
APPLE
{"symbol":"APPL",
"value":214.4,
"exchange":"NASDAQ",
"ts":"2025-07-23T14:56:10.009"}
and database definition:
CREATE TYPE stocks_keyspace.stocks_type (
symbol text,
ts timestamp,
exchange text,
value double
);
CREATE TABLE stocks_keyspace.stocks_table (
name text PRIMARY KEY,
stocks FROZEN<stocks_type>
);
The mapping configuration should be:
INSERT INTO stocks_keyspace.stocks_table
SELECT _key AS name, _value AS stocks
FROM stocks
Using now() function
You can leverage the now() which returns TIMEUUID function in the mapping. Here is how to use it in KCQL projection:
INSERT INTO ${target}
SELECT
`now()` as loaded_at
FROM myTopic
Write Timestamp
To specify an internal write-time timestamp from the database, choose a numeric field from the Kafka record payload. Use the following mapping syntax:
SELECT [_value/_key/_header].{path} AS message_internal_timestamp
FROM myTopic
--You may optionally specify the time unit by using:
PROPERTIES('timestampTimeUnit'='MICROSECONDS')
Examples:
SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
SELECT _key.timestampcol AS message_internal_timestamp FROM myTopic
Row-Level TTL
To define the time-to-live (TTL) for a database record, you can optionally map a field from the Kafka record payload. Use the following query:
SELECT [_value/_key/_header].{path} as message_internal_ttl
FROM myTopic
-- Optional: Specify the TTL unit
PROPERTIES('ttlTimeUnit'='SECONDS')
Examples:
SELECT _header.ttl as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='MINUTES')
SELECT _key.expiry as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='HOURS')
SELECT _value.ttl_duration as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='DAYS')
These examples demonstrate specifying TTL units such as 'MINUTES', 'HOURS', or 'DAYS', in addition to the default 'SECONDS'.
Converting date and time for a topic
To configure data and time conversion properties for a topic, use the following SQL command:
INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
'codec.locale' = 'en_US',
'codec.timeZone' = 'UTC',
'codec.timestamp' = 'CQL_TIMESTAMP',
'codec.date' = 'ISO_LOCAL_DATE',
'codec.time' = 'ISO_LOCAL_TIME',
'codec.unit' = 'MILLISECONDS'
)
Codec Parameter Descriptions:
codec.timestamp
Defines the pattern for converting strings to CQL timestamps. Options include specific date-time patterns like yyyy-MM-dd HH:mm:ss
or pre-defined formats such as ISO_ZONED_DATE_TIME
and ISO_INSTANT
. The special formatter CQL_TIMESTAMP
supports all CQL timestamp formats.
CQL_TIMESTAMP
codec.date
Specifies the pattern for converting strings to CQL dates. Options include date-time patterns like yyyy-MM-dd
and formatters such as ISO_LOCAL_DATE
.
ISO_LOCAL_DATE
codec.time
Sets the pattern for converting strings to CQL time, using patterns like HH:mm:ss
or formatters such as ISO_LOCAL_TIME
.
ISO_LOCAL_TIME
codec.unit
For digit-only inputs not parsed by codec.timestamp
, this sets the time unit for conversion. Accepts all TimeUnit
enum constants.
MILLISECONDS
codec.timeZone
Defines the time zone for conversions without an explicit time zone.
UTC
codec.locale
Specifies the locale for locale-sensitive conversions.
en_US
CQL Queries
Advanced CQL Query Configuration for Kafka Records
When a new Kafka record arrives, you have the option to run a custom CQL query. This feature is designed for advanced use cases; typically, the standard Kafka mapping suffices without needing a query. If you specify a query in the topic-to-table mapping, it will take precedence over the default action.
Important: You must include the bound variables used in the query within the mapping column.
Example
INSERT INTO ${target}
SELECT
_value.bigint as some_name,
_value.int as some_name2
FROM myTopic
PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')
Extra settings
consitencyLevel
Query Consistency Level Options:
ALL
EACH_QUORUM
QUORUM
LOCAL_QUORUM
ONE
TWO
THREE
LOCAL_ONE
ANY
CQL_TIMESTAMP
ttl
Specify the number of seconds before data is automatically deleted from the DSE table. When set, all rows in the topic table will share this TTL
value.
-1
nullToUnset
When handling nulls in Kafka, it's advisable to treat them as UNSET in DSE. DataStax suggests sticking with the default setting to minimize the creation of unnecessary tombstones.
true
deletesEnabled
Enable this feature to treat records as deletes if, after mapping, only the primary key columns have non-null values. This prevents the insertion or update of nulls in regular columns.
true
Java Driver settings
Using the connector configuration, prefix with
connect.cassandra.driver.{driver_setting}
any specific setting you would want to set for the Cassandra client. Refer to the DataStax Java driver documentation for more information.
Configuration Reference
connect.cassandra.contact.points
A comma-separated list of host names or IP addresses
localhost
String
connect.cassandra.port
Cassandra native port.
9042
String
connect.cassandra.max.concurrent.requests
Maximum number of requests to send to database at the same time.
100
String
connect.cassandra.connection.pool.size
The number of connections to maintain in the connection pool.
2
String
connect.cassandra.compression
Compression algorithm to use for the connection. Defaults to LZ4.
LZ4
String
connect.cassandra.query.timeout.ms
The Cassandra driver query timeout in milliseconds.
20000
Int
connect.cassandra.max.batch.size
Number of records to include in a write request to the database table.
64
Int
connect.cassandra.load.balancing.local.dc
The case-sensitive datacenter name for the driver to use for load balancing.
(no default)
String
connect.cassandra.auth.provider
Authentication provider
None
String
connect.cassandra.auth.username
Username for PLAIN (username/password) provider authentication
""
String
connect.cassandra.auth.password
Password for PLAIN (username/password) provider authentication
""
String
connect.cassandra.auth.gssapi.keytab
Kerberos keytab file for GSSAPI provider authentication
""
String
connect.cassandra.auth.gssapi.principal
Kerberos principal for GSSAPI provider authentication
""
String
connect.cassandra.auth.gssapi.service
SASL service name to use for GSSAPI provider authentication
dse
String
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
false
String
connect.cassandra.ssl.provider
The SSL provider to use for the connection. Available values are None, JDK or OpenSSL. Defaults to None.
None
String
connect.cassandra.ssl.truststore.path
Path to the client Trust Store.
(no default)
String
connect.cassandra.ssl.truststore.password
Password for the client Trust Store.
(no default)
String
connect.cassandra.ssl.keystore.path
Path to the client Key Store.
(no default)
String
connect.cassandra.ssl.keystore.password
Password for the client Key Store
(no default)
String
connect.cassandra.ssl.cipher.suites
The SSL cipher suites to use for the connection.
(no default)
String
connect.cassandra.ssl.hostname.verification
Enable hostname verification for the connection.
true
String
connect.cassandra.ssl.openssl.key.cert.chain
Enable OpenSSL key certificate chain for the connection.
(no default)
String
connect.cassandra.ssl.openssl.private.key
Enable OpenSSL private key for the connection.
(no default)
String
connect.cassandra.ignore.errors.mode
Can be one of 'none', 'all' or 'driver'
none
String
connect.cassandra.retry.interval
The time in milliseconds between retries.
60000
String
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed; THROW - the error is allowed to propagate; RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
THROW
String
connect.cassandra.max.retries
The maximum number of times to try the write again.
20
Int
connect.cassandra.kcql
KCQL expression describing field selection and routes.
(no default)
String
connect.cassandra.progress.enabled
Enables the output for how many records have been processed
false
Boolean
connect.cassandra.driver.*
Tweaks the Cassandra driver settings.
connect.cassandra.driver.basic.request.consistency=ALL
Refer to the DataStax Java driver documentation
Error policies
The connector supports Error policies.
Last updated
Was this helpful?