# Deprecated Cassandra

{% hint style="warning" %}
The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.
{% endhint %}

{% hint style="warning" %}
As of Stream-Reactor version 10, the sink will be deprecated. No other changes or fixes will be provided. Follow the Cassandra link to see the new sink implementation details.
{% endhint %}

## Connector Class

```
io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="false" %}

```bash
name=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=cassandra
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by\*\*`;`\*\* to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT INTO <your-cassandra-table>
SELECT FIELD,...
FROM <your-table>
[TTL=Time to live]
```

Examples:

```sql
-- Insert mode, select all fields from topicA and
-- write to tableA
INSERT INTO tableA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB
INSERT INTO tableB SELECT x AS a, y, c FROM topicB

-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB with TTL
INSERT INTO tableB SELECT x, y FROM topicB TTL=100000
```

## Deletion in Cassandra <a href="#deletion-in-cassandra" id="deletion-in-cassandra"></a>

Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.

Deletion in Cassandra is supported based on fields in the **key** of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:

```sql
DELETE FROM orders WHERE id = ? and product = ?
```

If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:

```bash
# Message
# "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
# DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"

# connect.cassandra.delete.enabled=true
# connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
# connect.cassandra.delete.struct_flds=id,product
```

Deletion will **only** occur if a message with an empty payload is received from Kafka.

{% hint style="info" %}
Ensure your ordinal position of the **`connect.cassandra.delete.struct_flds`** matches the binding order in the Cassandra delete statement!
{% endhint %}

## Kafka payload support <a href="#kafka-payload-support" id="kafka-payload-support"></a>

This sink supports the following Kafka payloads:

* Schema.Struct and Struct (Avro)
* Schema.Struct and JSON
* No Schema and JSON

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](https://docs.lenses.io/latest/connectors/tutorials/using-error-policies).

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

<table data-full-width="true"><thead><tr><th width="364">Name</th><th width="435">Description</th><th width="122.5">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.cassandra.contact.points</td><td>Initial contact point host for Cassandra including port.</td><td>string</td><td>localhost</td></tr><tr><td>connect.cassandra.port</td><td>Cassandra native port.</td><td>int</td><td>9042</td></tr><tr><td>connect.cassandra.key.space</td><td>Keyspace to write to.</td><td>string</td><td></td></tr><tr><td>connect.cassandra.username</td><td>Username to connect to Cassandra with.</td><td>string</td><td></td></tr><tr><td>connect.cassandra.password</td><td>Password for the username to connect to Cassandra with.</td><td>password</td><td></td></tr><tr><td>connect.cassandra.ssl.enabled</td><td>Secure Cassandra driver connection via SSL.</td><td>boolean</td><td>false</td></tr><tr><td>connect.cassandra.trust.store.path</td><td>Path to the client Trust Store.</td><td>string</td><td></td></tr><tr><td>connect.cassandra.trust.store.password</td><td>Password for the client Trust Store.</td><td>password</td><td></td></tr><tr><td>connect.cassandra.trust.store.type</td><td>Type of the Trust Store, defaults to JKS</td><td>string</td><td>JKS</td></tr><tr><td>connect.cassandra.key.store.type</td><td>Type of the Key Store, defauts to JKS</td><td>string</td><td>JKS</td></tr><tr><td>connect.cassandra.ssl.client.cert.auth</td><td>Enable client certification authentication by Cassandra. Requires KeyStore options to be set.</td><td>boolean</td><td>false</td></tr><tr><td>connect.cassandra.key.store.path</td><td>Path to the client Key Store.</td><td>string</td><td></td></tr><tr><td>connect.cassandra.key.store.password</td><td>Password for the client Key Store</td><td>password</td><td></td></tr><tr><td>connect.cassandra.consistency.level</td><td>Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.</td><td>string</td><td></td></tr><tr><td>connect.cassandra.fetch.size</td><td>The number of records the Cassandra driver will return at once.</td><td>int</td><td>5000</td></tr><tr><td>connect.cassandra.load.balancing.policy</td><td>Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN</td><td>string</td><td>TOKEN_AWARE</td></tr><tr><td>connect.cassandra.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.</td><td>string</td><td>THROW</td></tr><tr><td>connect.cassandra.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.cassandra.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.cassandra.threadpool.size</td><td>The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.</td><td>int</td><td>0</td></tr><tr><td>connect.cassandra.delete.struct_flds</td><td>Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.</td><td>list</td><td>[]</td></tr><tr><td>connect.cassandra.delete.statement</td><td>Delete statement for cassandra</td><td>string</td><td></td></tr><tr><td>connect.cassandra.kcql</td><td>KCQL expression describing field selection and routes.</td><td>string</td><td></td></tr><tr><td>connect.cassandra.default.value</td><td>By default a column omitted from the <strong>JSON</strong> map will be set to <strong>NULL</strong>. Alternatively, if set <strong>UNSET</strong>, pre-existing value will be preserved.</td><td>string</td><td></td></tr><tr><td>connect.cassandra.delete.enabled</td><td>Enables row deletion from cassandra</td><td>boolean</td><td>false</td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed</td><td>boolean</td><td>false</td></tr></tbody></table>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/cassandra.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
