All pages
Powered by GitBook
1 of 1

Loading...

Cassandra

This page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.

Stream Reactor Cassandra Sink Connector is designed to move data from Apache Kafka to Apache Cassandra . It supports Apache Cssandra 3.0 and later, DataStax Enterprise 4.7 and later, and DataStax Astra databases.

Connector Class

The release artifact is called kafka-connect-casandra-sink.

Features

  • Database authentication : User-password, LDAP, Kerberos

  • Input formats: The connector supports Avro, Json Schema, Protobuf, Json (schemaless) and primitive data formats. For Schema-Registry based formats, like AVRO, Protobuf, Json-schema, it needs the the Scheam Registry settings.

  • At least once semantics : The Connect framework stores record offsets in Kafka and resumes from the last committed offset on restart. This ensures reliable delivery but may occasionally result in duplicate record processing during failures

Example

For more examples see the .

Connection

The connector requires a connection to the database. To enable it, specify the following configuration entries. See Configuration Reference for details.

Security

SSL connection

When the database cluster has enabled client encryption, configure the SSL Keys and certificates:

User-Password or LDAP authentication

When the database is configured with user-password of LDAP authentication, configure the following:

Kerberos authentication

When the database cluster has Kerberos authentication enabled set the following settings:

KCQL support

You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

KCQL (Kafka Connect Query Language) is a SQL-like syntax that provides a declarative way to configure connector behavior. It serves as the primary interface for defining how data flows from source topics to target Cassandra tables.

Purpose and Functionality

KCQL statements define three core aspects of data pipeline configuration:

Data Source and Target: Specifies which Kafka topic serves as the data source and which Cassandra table receives the data.

Field Mapping: Controls how fields from the source topic map to columns in the target table, including transformations and selective field inclusion.

Operational Parameters: Configures advanced settings such as consistency levels, delete operations, time-to-live (TTL) settings, and timestamp handling.

Key Capabilities

  • Flexible Field Mapping: Map source fields to target columns with optional transformations

  • Consistency Control: Set read and write consistency levels for Cassandra operations

  • Delete Operations: Enable or disable delete functionality based on message content

  • TTL Management: Configure automatic data expiration using time-to-live settings

Syntax Template

The basic KCQL statement follows this structure:

INSERT INTO: Specifies the target Cassandra keyspace and table where data will be written.

SELECT: Defines field projection and mapping from the source.

FROM: Identifies the source Kafka topic containing the data to be processed.

PROPERTIES: Optional clause for configuring connector behavior such as consistency levels, TTL settings, and operational parameters.

Examples

Table mapping

Kafka to Database Table Mapping

The connector enables mapping fields from Kafka records to database table columns. A single connector can handle multiple topics, where each topic can map to one or more tables. Kafka messages consist of a Key, Value, and Header.

Generic Mapping Syntax

Use the following syntax to map Kafka message fields to database columns:

  • Prefix the field with:

    • _value: For fields from the Kafka message's Value component.

    • _key: For fields from the Kafka message's Key component.

Note: If no prefix is provided, _value is assumed, mapping to the Kafka record's Value component.

The record Key and Value can be mapped to specific columns like this, considering a table with row_key and content columns:

To map database columns with whitespace to KCQL projections, follow this format:

Fan-out

You can map a topic's data to multiple database tables using multiple KCQL statements in the same connect.cassandra.kcql configuration. For example:

User defined types

The connector can map complex types from a Kafka record payload into a user-defined type (UDT) column in the database. The incoming data field names must match the database field names.

Consider the following Kafka record format:

key
value

and database definition:

The mapping configuration should be:

Using now() function

You can leverage the now() which returns TIMEUUID function in the mapping. Here is how to use it in KCQL projection:

Write Timestamp

To specify an internal write-time timestamp from the database, choose a numeric field from the Kafka record payload. Use the following mapping syntax:

Examples:

Row-Level TTL

To define the time-to-live (TTL) for a database record, you can optionally map a field from the Kafka record payload. Use the following query:

Examples:

These examples demonstrate specifying TTL units such as 'MINUTES', 'HOURS', or 'DAYS', in addition to the default 'SECONDS'.

Converting date and time for a topic

To configure data and time conversion properties for a topic, use the following SQL command:

Codec Parameter Descriptions:

Parameter
Description
Default

CQL Queries

Advanced CQL Query Configuration for Kafka Records

When a new Kafka record arrives, you have the option to run a custom CQL query. This feature is designed for advanced use cases; typically, the standard Kafka mapping suffices without needing a query. If you specify a query in the topic-to-table mapping, it will take precedence over the default action.

Important: You must include the bound variables used in the query within the mapping column.

Example

Extra settings

Parameter
Description
Default

Java Driver settings

Using the connector configuration, prefix with

any specific setting you would want to set for the Cassandra client. Refer to the for more information.

Configuration Reference

Setting
Description
Default Value
Type

Error policies

The connector supports .

io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
Kafka topic to database's table mapping: It allows the control of which Kafka message fields are written to the database table columns
  • Multi-table mapping: Enables writing a topic to multiple database tables

  • Date/Time/Timestamp formats: Allows control of the date, time, and timestamp format conversion between Kafka messages and database columns, supporting custom parsing and formatting patterns.

  • Consistency Level control: Allows configuring the write consistency on a per table mapping

  • Row Level Time-to-Live: Allows configuring the row-level TTL on a per table mapping

  • Deletes: Allows configuring the deletes on a per table mapping

  • Timestamp Handling: Control how timestamps are processed and stored

  • Conditional Logic: Apply filtering and conditional processing to incoming data

  • _header: For fields from the Kafka message's Header.

    codec.timeZone

    Defines the time zone for conversions without an explicit time zone.

    UTC

    codec.locale

    Specifies the locale for locale-sensitive conversions.

    en_US

    connect.cassandra.connection.pool.size

    The number of connections to maintain in the connection pool.

    2

    String

    connect.cassandra.compression

    Compression algorithm to use for the connection. Defaults to LZ4.

    LZ4

    String

    connect.cassandra.query.timeout.ms

    The Cassandra driver query timeout in milliseconds.

    20000

    Int

    connect.cassandra.max.batch.size

    Number of records to include in a write request to the database table.

    64

    Int

    connect.cassandra.load.balancing.local.dc

    The case-sensitive datacenter name for the driver to use for load balancing.

    (no default)

    String

    connect.cassandra.auth.provider

    Authentication provider

    None

    String

    connect.cassandra.auth.username

    Username for PLAIN (username/password) provider authentication

    ""

    String

    connect.cassandra.auth.password

    Password for PLAIN (username/password) provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.keytab

    Kerberos keytab file for GSSAPI provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.principal

    Kerberos principal for GSSAPI provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.service

    SASL service name to use for GSSAPI provider authentication

    dse

    String

    connect.cassandra.ssl.enabled

    Secure Cassandra driver connection via SSL.

    false

    String

    connect.cassandra.ssl.provider

    The SSL provider to use for the connection. Available values are None, JDK or OpenSSL. Defaults to None.

    None

    String

    connect.cassandra.ssl.truststore.path

    Path to the client Trust Store.

    (no default)

    String

    connect.cassandra.ssl.truststore.password

    Password for the client Trust Store.

    (no default)

    String

    connect.cassandra.ssl.keystore.path

    Path to the client Key Store.

    (no default)

    String

    connect.cassandra.ssl.keystore.password

    Password for the client Key Store

    (no default)

    String

    connect.cassandra.ssl.cipher.suites

    The SSL cipher suites to use for the connection.

    (no default)

    String

    connect.cassandra.ssl.hostname.verification

    Enable hostname verification for the connection.

    true

    String

    connect.cassandra.ssl.openssl.key.cert.chain

    Enable OpenSSL key certificate chain for the connection.

    (no default)

    String

    connect.cassandra.ssl.openssl.private.key

    Enable OpenSSL private key for the connection.

    (no default)

    String

    connect.cassandra.ignore.errors.mode

    Can be one of 'none', 'all' or 'driver'

    none

    String

    connect.cassandra.retry.interval

    The time in milliseconds between retries.

    60000

    String

    connect.cassandra.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed; THROW - the error is allowed to propagate; RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

    THROW

    String

    connect.cassandra.max.retries

    The maximum number of times to try the write again.

    20

    Int

    connect.cassandra.kcql

    KCQL expression describing field selection and routes.

    (no default)

    String

    connect.cassandra.progress.enabled

    Enables the output for how many records have been processed

    false

    Boolean

    Tweaks the Cassandra driver settings.

    Refer to the

    APPLE

    codec.timestamp

    Defines the pattern for converting strings to CQL timestamps. Options include specific date-time patterns like yyyy-MM-dd HH:mm:ss or pre-defined formats such as ISO_ZONED_DATE_TIME and ISO_INSTANT. The special formatter CQL_TIMESTAMP supports all CQL timestamp formats.

    CQL_TIMESTAMP

    codec.date

    Specifies the pattern for converting strings to CQL dates. Options include date-time patterns like yyyy-MM-dd and formatters such as ISO_LOCAL_DATE.

    ISO_LOCAL_DATE

    codec.time

    Sets the pattern for converting strings to CQL time, using patterns like HH:mm:ss or formatters such as ISO_LOCAL_TIME.

    ISO_LOCAL_TIME

    codec.unit

    For digit-only inputs not parsed by codec.timestamp, this sets the time unit for conversion. Accepts all TimeUnit enum constants.

    consitencyLevel

    Query Consistency Level Options:

    • ALL

    • EACH_QUORUM

    • QUORUM

    • LOCAL_QUORUM

    • ONE

    • TWO

    • THREE

    • LOCAL_ONE

    • ANY

    CQL_TIMESTAMP

    ttl

    Specify the number of seconds before data is automatically deleted from the DSE table. When set, all rows in the topic table will share this TTL value.

    -1

    nullToUnset

    When handling nulls in Kafka, it's advisable to treat them as UNSET in DSE. DataStax suggests sticking with the default setting to minimize the creation of unnecessary tombstones.

    true

    deletesEnabled

    Enable this feature to treat records as deletes if, after mapping, only the primary key columns have non-null values. This prevents the insertion or update of nulls in regular columns.

    connect.cassandra.contact.points

    A comma-separated list of host names or IP addresses

    localhost

    String

    connect.cassandra.port

    Cassandra native port.

    9042

    String

    connect.cassandra.max.concurrent.requests

    Maximum number of requests to send to database at the same time.

    100

    tutorials
    DataStax Java driver documentation
    Error policies

    MILLISECONDS

    true

    String

    Simple configuration
    name=cassandra-sink
    connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
    tasks.max=1
    topics=orders
    connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
    connect.cassandra.port=9042 
    connect.cassandra.contact.points=cassandra
    connect.cassandra.contact.points=[host list]
    connnect.cassandra.port=9042
    connect.cassandra.load.balancing.local.dc=datacentre-name
    # optional entries
    # connect.cassandra.max.concurrent.requests = 100
    # connect.cassandra.max.batch.size = 64
    # connect.cassandra.connection.pool.size = 4
    # connect.cassandra.query.timeout.ms = 30
    # connect.cassandra.compression = None
    connect.cassandra.ssl.provider=
    connect.cassandra.ssl.cipher.suites=
    connect.cassandra.ssl.hostname.verification=true
    
    
    connect.cassandra.ssl.keystore.path=
    connect.cassandra.ssl.keystore.password=
    
    connect.cassandra.ssl.truststore.password=
    connect.cassandra.ssl.truststore.path=
    
    # Path to the SSL certificate file, when using OpenSSL.
    connect.cassandra.ssl.openssl.key.cert.chain=
    
    # Path to the private key file, when using OpenSSL.
    connect.cassandra.ssl.openssl.private.key=
    connect.cassandra.auth.provider=
    connect.cassandra.auth.username=
    connect.cassandra.auth.password=
    
    # for SASL authentication which requires connect.cassandra.auth.provider
    # set to GSSAPI 
    connect.cassandra.auth.gssapi.keytab=
    connect.cassandra.auth.gssapi.principal=
    connect.cassandra.auth.gssapi.service=
    connect.cassandra.auth.provider=GSSAPI
    connect.cassandra.auth.gssapi.keytab=
    connect.cassandra.auth.gssapi.principal=
    connect.cassandra.auth.gssapi.service=
    INSERT INTO <keyspace><your-cassandra-table>
    SELECT <field projection,...
    FROM <your-table>
    PROPERTIES(< a set of keys to control the connector behaviour>)
    INSERT INTO mykeyspace.types
    SELECT 
     _key.bigint as bigintcol
     , _value.boolean as booleancol
     , _key.double as doublecol
     , _value.float as floatcol
     , _header.int as intcol
    FROM myTopic
    
    INSERT INTO mykespace.types
    SELECT 
     _value.bigint as bigintcol
     , _value.double as doublecol
     , _value.ttlcol as message_internal_ttl 
     , _value.timestampcol as message_internal_timestamp
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
    
    INSERT INTO mykeyspace.CASE_SENSITIVE
    SELECT
        `_key.'bigint field'` as 'bigint col', 
        `_key.'boolean-field'` as 'boolean-col', 
        `_value.'INT FIELD'` as 'INT COL', 
        `_value.'TEXT.FIELD'` as 'TEXT.COL'
    FROM mytopic 
    
    
    INSERT INTO mykeyspace.tableA 
    SELECT 
        key.my_pk as my_pk
        , _value.my_value as my_value 
    FROM topicA
    PROPERTIES(
     'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
     'deletesEnabled' ='true'
    )
    INSERT INTO ${target}
    SELECT _value/_key/_header.{field_path} AS ${table_column}
    FROM mytopic
    INSERT INTO ${target}
    SELECT 
      _key as row_key
      , _value as content
    INSERT INTO ${target}
    SELECT
      `_key.'bigint field'` as 'bigint col',
      `_key.'boolean-field'` as 'boolean-col',
      `_value.'INT FIELD'` as 'INT COL',
      `_value.'TEXT.FIELD'` as 'TEXT.COL'
    FROM myTopic
    INSERT INTO stocks_keyspace.stocks_by_symbol
    SELECT _value.symbol AS ticker, 
           _value.ts AS ts, 
           _value.exchange AS exchange, 
           _value.value AS value 
    FROM stocks;
    INSERT INTO stocks_keyspace.stocks_by_exchange
    SELECT _value.symbol AS ticker, 
           _value.ts AS ts, 
           _value.exchange AS exchange, 
           _value.value AS value 
    FROM stocks;
    {"symbol":"APPL",
    "value":214.4,
    "exchange":"NASDAQ",
    "ts":"2025-07-23T14:56:10.009"}
    CREATE TYPE stocks_keyspace.stocks_type (
        symbol text,
        ts timestamp,
        exchange text,
        value double
    );
    
    CREATE TABLE stocks_keyspace.stocks_table (
        name text PRIMARY KEY,
        stocks FROZEN<stocks_type>
    );
    INSERT INTO stocks_keyspace.stocks_table
    SELECT _key AS name, _value AS stocks
    FROM stocks
    INSERT INTO ${target}
    SELECT
      `now()` as loaded_at
    FROM myTopic
    SELECT [_value/_key/_header].{path} AS message_internal_timestamp
    FROM myTopic
    --You may optionally specify the time unit by using:
    PROPERTIES('timestampTimeUnit'='MICROSECONDS')
    SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
    SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
    SELECT _key.timestampcol AS message_internal_timestamp FROM myTopic
    SELECT [_value/_key/_header].{path} as message_internal_ttl
    FROM myTopic
    -- Optional: Specify the TTL unit
    PROPERTIES('ttlTimeUnit'='SECONDS')
    SELECT _header.ttl as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='MINUTES')
    
    SELECT _key.expiry as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='HOURS')
    
    SELECT _value.ttl_duration as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='DAYS')
    INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
        'codec.locale' = 'en_US',
        'codec.timeZone' = 'UTC',
        'codec.timestamp' = 'CQL_TIMESTAMP',
        'codec.date' = 'ISO_LOCAL_DATE',
        'codec.time' = 'ISO_LOCAL_TIME',
        'codec.unit' = 'MILLISECONDS'
    )
    INSERT INTO ${target}
    SELECT 
        _value.bigint as some_name,
        _value.int as some_name2
    FROM myTopic
    PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')
    connect.cassandra.driver.{driver_setting}
    DataStax Java driver documentation
    connect.cassandra.driver.*
    connect.cassandra.driver.basic.request.consistency=ALL