# Cassandra

Stream Reactor Cassandra Sink Connector is designed to move data from Apache Kafka to Apache Cassandra . It supports Apache Cssandra 3.0 and later, DataStax Enterprise 4.7 and later, and DataStax Astra databases.

## Connector Class

```
io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
```

The [release artifact](https://github.com/lensesio/stream-reactor/releases) is called *kafka-connect-casandra-sink*.

## Features

* **Database authentication** :  User-password, LDAP,  Kerberos
* **Input formats**: The connector supports Avro, Json Schema, Protobuf, Json (schemaless) and primitive data formats. For Schema-Registry based formats,  like AVRO, Protobuf, Json-schema, it needs the the Scheam Registry settings.
* **At least once semantics** :  The Connect framework stores record offsets in Kafka and resumes from the last committed offset on restart. This ensures reliable delivery but may occasionally result in duplicate record processing during failures
* **Kafka topic to database's table mapping:** It allows the control of which Kafka message fields are written to the database table columns
* **Multi-table mapping**: Enables writing a topic to multiple database tables
* **Date/Time/Timestamp formats**: Allows control of the date, time, and timestamp format conversion between Kafka messages and database columns, supporting custom parsing and formatting patterns.
* **Consistency Level control**:  Allows configuring the write consistency on a per table mapping
* **Row Level Time-to-Live**: Allows configuring the row-level TTL on a per table mapping
* **Deletes:** Allows configuring the deletes on a per table mapping

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code title="Simple configuration" overflow="wrap" lineNumbers="true" fullWidth="false" %}

```bash
name=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
connect.cassandra.port=9042 
connect.cassandra.contact.points=cassandra
```

{% endcode %}

## Connection <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

The connector requires a connection to the database. To enable it, specify the following configuration entries. See Configuration Reference for details.

```systemd
connect.cassandra.contact.points=[host list]
connnect.cassandra.port=9042
connect.cassandra.load.balancing.local.dc=datacentre-name
# optional entries
# connect.cassandra.max.concurrent.requests = 100
# connect.cassandra.max.batch.size = 64
# connect.cassandra.connection.pool.size = 4
# connect.cassandra.query.timeout.ms = 30
# connect.cassandra.compression = None
```

## Security <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

### SSL connection

When the database cluster has enabled client encryption, configure the SSL Keys and certificates:

```
connect.cassandra.ssl.provider=
connect.cassandra.ssl.cipher.suites=
connect.cassandra.ssl.hostname.verification=true


connect.cassandra.ssl.keystore.path=
connect.cassandra.ssl.keystore.password=

connect.cassandra.ssl.truststore.password=
connect.cassandra.ssl.truststore.path=

# Path to the SSL certificate file, when using OpenSSL.
connect.cassandra.ssl.openssl.key.cert.chain=

# Path to the private key file, when using OpenSSL.
connect.cassandra.ssl.openssl.private.key=
```

### User-Password or LDAP authentication

When the database is configured with user-password of LDAP authentication, configure the following:<br>

```
connect.cassandra.auth.provider=
connect.cassandra.auth.username=
connect.cassandra.auth.password=

# for SASL authentication which requires connect.cassandra.auth.provider
# set to GSSAPI 
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=
```

### Kerberos authentication

When the database cluster has Kerberos authentication enabled set the following settings:

```
connect.cassandra.auth.provider=GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=
```

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by\*\*`;`\*\* to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

KCQL (Kafka Connect Query Language) is a SQL-like syntax that provides a declarative way to configure connector behavior. It serves as the primary interface for defining how data flows from source topics to target Cassandra tables.

### Purpose and Functionality

KCQL statements define three core aspects of data pipeline configuration:

**Data Source and Target**: Specifies which Kafka topic serves as the data source and which Cassandra table receives the data.

**Field Mapping**: Controls how fields from the source topic map to columns in the target table, including transformations and selective field inclusion.

**Operational Parameters**: Configures advanced settings such as consistency levels, delete operations, time-to-live (TTL) settings, and timestamp handling.

### Key Capabilities

* **Flexible Field Mapping**: Map source fields to target columns with optional transformations
* **Consistency Control**: Set read and write consistency levels for Cassandra operations
* **Delete Operations**: Enable or disable delete functionality based on message content
* **TTL Management**: Configure automatic data expiration using time-to-live settings
* **Timestamp Handling**: Control how timestamps are processed and stored
* **Conditional Logic**: Apply filtering and conditional processing to incoming data

### Syntax Template

The basic KCQL statement follows this structure:

```sql
INSERT INTO <keyspace><your-cassandra-table>
SELECT <field projection,...
FROM <your-table>
PROPERTIES(< a set of keys to control the connector behaviour>)
```

**INSERT INTO**: Specifies the target Cassandra keyspace and table where data will be written.

**SELECT**: Defines field projection and mapping from the source.&#x20;

**FROM**: Identifies the source Kafka topic containing the data to be processed.

**PROPERTIES**: Optional clause for configuring connector behavior such as consistency levels, TTL settings, and operational parameters.

### Examples

```sql
INSERT INTO mykeyspace.types
SELECT 
 _key.bigint as bigintcol
 , _value.boolean as booleancol
 , _key.double as doublecol
 , _value.float as floatcol
 , _header.int as intcol
FROM myTopic

INSERT INTO mykespace.types
SELECT 
 _value.bigint as bigintcol
 , _value.double as doublecol
 , _value.ttlcol as message_internal_ttl 
 , _value.timestampcol as message_internal_timestamp
FROM myTopic
PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')

INSERT INTO mykeyspace.CASE_SENSITIVE
SELECT
    `_key.'bigint field'` as 'bigint col', 
    `_key.'boolean-field'` as 'boolean-col', 
    `_value.'INT FIELD'` as 'INT COL', 
    `_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM mytopic 


INSERT INTO mykeyspace.tableA 
SELECT 
    key.my_pk as my_pk
    , _value.my_value as my_value 
FROM topicA
PROPERTIES(
 'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
 'deletesEnabled' ='true'
)
```

## Table mapping <a href="#kcql-support" id="kcql-support"></a>

#### Kafka to Database Table Mapping

The connector enables mapping fields from Kafka records to database table columns. A single connector can handle multiple topics, where each topic can map to one or more tables. Kafka messages consist of a Key, Value, and Header.

**Generic Mapping Syntax**

Use the following syntax to map Kafka message fields to database columns:

```sql
INSERT INTO ${target}
SELECT _value/_key/_header.{field_path} AS ${table_column}
FROM mytopic
```

* **Prefix the field with:**
  * `_value`: For fields from the Kafka message's Value component.
  * `_key`: For fields from the Kafka message's Key component.
  * `_header`: For fields from the Kafka message's Header.

**Note:** If no prefix is provided, `_value` is assumed, mapping to the Kafka record's Value component.

The record Key and Value can be mapped to specific columns like this, considering a table with row\_key and content columns:

```sql
INSERT INTO ${target}
SELECT 
  _key as row_key
  , _value as content
```

To map database columns with whitespace to KCQL projections, follow this format:

```sql
INSERT INTO ${target}
SELECT
  `_key.'bigint field'` as 'bigint col',
  `_key.'boolean-field'` as 'boolean-col',
  `_value.'INT FIELD'` as 'INT COL',
  `_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM myTopic
```

### Fan-out

You can map a topic's data to multiple database tables using multiple KCQL statements in the same `connect.cassandra.kcql` configuration. For example:

```sql
INSERT INTO stocks_keyspace.stocks_by_symbol
SELECT _value.symbol AS ticker, 
       _value.ts AS ts, 
       _value.exchange AS exchange, 
       _value.value AS value 
FROM stocks;
INSERT INTO stocks_keyspace.stocks_by_exchange
SELECT _value.symbol AS ticker, 
       _value.ts AS ts, 
       _value.exchange AS exchange, 
       _value.value AS value 
FROM stocks;
```

### User defined types

The connector can map complex types from a Kafka record payload into a user-defined type (UDT) column in the database. The incoming data field names must match the database field names.

Consider the following Kafka record format:

<table><thead><tr><th>key</th><th>value</th></tr></thead><tbody><tr><td>APPLE</td><td><pre><code>{"symbol":"APPL",
"value":214.4,
"exchange":"NASDAQ",
"ts":"2025-07-23T14:56:10.009"}
</code></pre></td></tr></tbody></table>

and database definition:

```sql
CREATE TYPE stocks_keyspace.stocks_type (
    symbol text,
    ts timestamp,
    exchange text,
    value double
);

CREATE TABLE stocks_keyspace.stocks_table (
    name text PRIMARY KEY,
    stocks FROZEN<stocks_type>
);
```

The mapping configuration should be:

```sql
INSERT INTO stocks_keyspace.stocks_table
SELECT _key AS name, _value AS stocks
FROM stocks
```

### Using now() function

You can leverage the now() which returns TIMEUUID function in the mapping. Here is how to use it in KCQL projection:<br>

```
INSERT INTO ${target}
SELECT
  `now()` as loaded_at
FROM myTopic
```

### Write Timestamp

To specify an internal write-time timestamp from the database, choose a numeric field from the Kafka record payload. Use the following mapping syntax:

```sql
SELECT [_value/_key/_header].{path} AS message_internal_timestamp
FROM myTopic
--You may optionally specify the time unit by using:
PROPERTIES('timestampTimeUnit'='MICROSECONDS')
```

Examples:

```sql
SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
SELECT _key.timestampcol AS message_internal_timestamp FROM myTopic
```

### Row-Level TTL

To define the time-to-live (TTL) for a database record, you can optionally map a field from the Kafka record payload. Use the following query:

```sql
SELECT [_value/_key/_header].{path} as message_internal_ttl
FROM myTopic
-- Optional: Specify the TTL unit
PROPERTIES('ttlTimeUnit'='SECONDS')
```

#### Examples:

```sql
SELECT _header.ttl as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='MINUTES')

SELECT _key.expiry as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='HOURS')

SELECT _value.ttl_duration as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='DAYS')
```

These examples demonstrate specifying TTL units such as 'MINUTES', 'HOURS', or 'DAYS', in addition to the default 'SECONDS'.

### Converting date and time for a topic

To configure data and time conversion properties for a topic, use the following SQL command:

```sql
INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
    'codec.locale' = 'en_US',
    'codec.timeZone' = 'UTC',
    'codec.timestamp' = 'CQL_TIMESTAMP',
    'codec.date' = 'ISO_LOCAL_DATE',
    'codec.time' = 'ISO_LOCAL_TIME',
    'codec.unit' = 'MILLISECONDS'
)
```

**Codec Parameter Descriptions:**

| Parameter         | Description                                                                                                                                                                                                                                                                            | Default          |
| ----------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- |
| `codec.timestamp` | Defines the pattern for converting strings to CQL timestamps. Options include specific date-time patterns like `yyyy-MM-dd HH:mm:ss` or pre-defined formats such as `ISO_ZONED_DATE_TIME` and `ISO_INSTANT`. The special formatter `CQL_TIMESTAMP` supports all CQL timestamp formats. | `CQL_TIMESTAMP`  |
| `codec.date`      | Specifies the pattern for converting strings to CQL dates. Options include date-time patterns like `yyyy-MM-dd` and formatters such as `ISO_LOCAL_DATE`.                                                                                                                               | `ISO_LOCAL_DATE` |
| `codec.time`      | Sets the pattern for converting strings to CQL time, using patterns like `HH:mm:ss` or formatters such as `ISO_LOCAL_TIME`.                                                                                                                                                            | `ISO_LOCAL_TIME` |
| `codec.unit`      | For digit-only inputs not parsed by `codec.timestamp`, this sets the time unit for conversion. Accepts all `TimeUnit` enum constants.                                                                                                                                                  | `MILLISECONDS`   |
| `codec.timeZone`  | Defines the time zone for conversions without an explicit time zone.                                                                                                                                                                                                                   | `UTC`            |
| `codec.locale`    | Specifies the locale for locale-sensitive conversions.                                                                                                                                                                                                                                 | `en_US`          |

### CQL Queries&#x20;

#### Advanced CQL Query Configuration for Kafka Records

When a new Kafka record arrives, you have the option to run a custom CQL query. This feature is designed for advanced use cases; typically, the standard Kafka mapping suffices without needing a query. If you specify a query in the topic-to-table mapping, it will take precedence over the default action.

**Important:** You must include the bound variables used in the query within the mapping column.

**Example**&#x20;

{% code overflow="wrap" %}

```sql
INSERT INTO ${target}
SELECT 
    _value.bigint as some_name,
    _value.int as some_name2
FROM myTopic
PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')
```

{% endcode %}

### Extra settings&#x20;

| Parameter         | Description                                                                                                                                                                                                                                                                                                                                          | Default         |
| ----------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------- |
| `consitencyLevel` | <p>Query Consistency Level Options:</p><ul><li><strong>ALL</strong></li><li><strong>EACH\_QUORUM</strong></li><li><strong>QUORUM</strong></li><li><strong>LOCAL\_QUORUM</strong></li><li><strong>ONE</strong></li><li><strong>TWO</strong></li><li><strong>THREE</strong></li><li><strong>LOCAL\_ONE</strong></li><li><strong>ANY</strong></li></ul> | `CQL_TIMESTAMP` |
| `ttl`             | Specify the number of seconds before data is automatically deleted from the DSE table. When set, all rows in the topic table will share this `TTL` value.                                                                                                                                                                                            | `-1`            |
| `nullToUnset`     | When handling nulls in Kafka, it's advisable to treat them as UNSET in DSE. DataStax suggests sticking with the default setting to minimize the creation of unnecessary tombstones.                                                                                                                                                                  | `true`          |
| deletesEnabled    | Enable this feature to treat records as deletes if, after mapping, only the primary key columns have non-null values. This prevents the insertion or update of nulls in regular columns.                                                                                                                                                             | true            |

## Java Driver settings <a href="#kcql-support" id="kcql-support"></a>

Using the connector  configuration, prefix with&#x20;

```
connect.cassandra.driver.{driver_setting}
```

any specific setting you would want to set for the Cassandra client. Refer to the [DataStax Java driver documentation](https://docs.datastax.com/en/developer/java-driver-dse/2.3/manual/core/configuration/) for more information.

## Configuration Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

<table data-full-width="true"><thead><tr><th width="402.68359375">Setting</th><th width="452.5703125">Description</th><th width="105.62109375">Default Value</th><th>Type</th></tr></thead><tbody><tr><td><code>connect.cassandra.contact.points</code></td><td>A comma-separated list of host names or IP addresses</td><td><code>localhost</code></td><td>String</td></tr><tr><td><code>connect.cassandra.port</code></td><td>Cassandra native port.</td><td><code>9042</code></td><td>String</td></tr><tr><td><code>connect.cassandra.max.concurrent.requests</code></td><td>Maximum number of requests to send to database at the same time.</td><td><code>100</code></td><td>String</td></tr><tr><td><code>connect.cassandra.connection.pool.size</code></td><td>The number of connections to maintain in the connection pool.</td><td><code>2</code></td><td>String</td></tr><tr><td><code>connect.cassandra.compression</code></td><td>Compression algorithm to use for the connection. Defaults to LZ4.</td><td><code>LZ4</code></td><td>String</td></tr><tr><td><code>connect.cassandra.query.timeout.ms</code></td><td>The Cassandra driver query timeout in milliseconds.</td><td><code>20000</code></td><td>Int</td></tr><tr><td><code>connect.cassandra.max.batch.size</code></td><td>Number of records to include in a write request to the database table.</td><td><code>64</code></td><td>Int</td></tr><tr><td><code>connect.cassandra.load.balancing.local.dc</code></td><td>The case-sensitive datacenter name for the driver to use for load balancing.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.auth.provider</code></td><td>Authentication provider</td><td><code>None</code></td><td>String</td></tr><tr><td><code>connect.cassandra.auth.username</code></td><td>Username for PLAIN (username/password) provider authentication</td><td><code>""</code></td><td>String</td></tr><tr><td><code>connect.cassandra.auth.password</code></td><td>Password for PLAIN (username/password) provider authentication</td><td><code>""</code></td><td>String</td></tr><tr><td><code>connect.cassandra.auth.gssapi.keytab</code></td><td>Kerberos keytab file for GSSAPI provider authentication</td><td><code>""</code></td><td>String</td></tr><tr><td><code>connect.cassandra.auth.gssapi.principal</code></td><td>Kerberos principal for GSSAPI provider authentication</td><td><code>""</code></td><td>String</td></tr><tr><td><code>connect.cassandra.auth.gssapi.service</code></td><td>SASL service name to use for GSSAPI provider authentication</td><td><code>dse</code></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.enabled</code></td><td>Secure Cassandra driver connection via SSL.</td><td><code>false</code></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.provider</code></td><td>The SSL provider to use for the connection. Available values are None, JDK or OpenSSL. Defaults to None.</td><td><code>None</code></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.truststore.path</code></td><td>Path to the client Trust Store.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.truststore.password</code></td><td>Password for the client Trust Store.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.keystore.path</code></td><td>Path to the client Key Store.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.keystore.password</code></td><td>Password for the client Key Store</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.cipher.suites</code></td><td>The SSL cipher suites to use for the connection.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.hostname.verification</code></td><td>Enable hostname verification for the connection.</td><td><code>true</code></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.openssl.key.cert.chain</code></td><td>Enable OpenSSL key certificate chain for the connection.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.ssl.openssl.private.key</code></td><td>Enable OpenSSL private key for the connection.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.ignore.errors.mode</code></td><td>Can be one of 'none', 'all' or 'driver'</td><td><code>none</code></td><td>String</td></tr><tr><td><code>connect.cassandra.retry.interval</code></td><td>The time in milliseconds between retries.</td><td><code>60000</code></td><td>String</td></tr><tr><td><code>connect.cassandra.error.policy</code></td><td>Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed; THROW - the error is allowed to propagate; RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.</td><td><code>THROW</code></td><td>String</td></tr><tr><td><code>connect.cassandra.max.retries</code></td><td>The maximum number of times to try the write again.</td><td><code>20</code></td><td>Int</td></tr><tr><td><code>connect.cassandra.kcql</code></td><td>KCQL expression describing field selection and routes.</td><td><em>(no default)</em></td><td>String</td></tr><tr><td><code>connect.cassandra.progress.enabled</code></td><td>Enables the output for how many records have been processed</td><td><code>false</code></td><td>Boolean</td></tr><tr><td><p></p><pre><code>connect.cassandra.driver.*
</code></pre></td><td><p>Tweaks the Cassandra driver settings. <br></p><pre data-overflow="wrap"><code>connect.cassandra.driver.basic.request.consistency=ALL
</code></pre><p>Refer to the <a href="https://docs.datastax.com/en/developer/java-driver-dse/2.3/manual/core/configuration/">DataStax Java driver documentation</a></p></td><td></td><td></td></tr></tbody></table>

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](https://docs.lenses.io/latest/connectors/tutorials/using-error-policies).
