# Azure CosmosDB

{% hint style="warning" %}
Version 10.0.0 introduces breaking changes: the connector has been renamed from DocumentDB, uses the official CosmosDB SDK, and supports new bulk and key strategies.
{% endhint %}

{% hint style="success" %}
A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.
{% endhint %}

## Connector Class

```
io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="true" %}

```bash
name=cosmosdb
connector.class=io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
tasks.max=1
topics=orders-string
connect.cosmosdb.kcql=INSERT INTO orders SELECT * FROM orders-string
connect.cosmosdb.db=dm
connect.cosmosdb.endpoint=[YOUR_AZURE_ENDPOINT]
connect.cosmosdb.db.create=true
connect.cosmosdb.master.key=[YOUR_MASTER_KEY]
connect.cosmosdb.batch.size=10
```

{% endcode %}

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by\*\*`;`\*\* to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT | UPSERT
INTO <your-collection>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELDS,...]
```

Examples:

```sql
-- Insert mode, select all fields from topicA
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA

-- UPSERT mode, select 3 fields and
-- rename from topicB and write to tableB
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id
```

### Insert Mode <a href="#insert-mode" id="insert-mode"></a>

Insert is the default write mode of the sink. It inserts messages from Kafka topics into CosmosDB.

### Upsert Mode <a href="#upsert-mode" id="upsert-mode"></a>

The Sink supports CosmosDB upsert functionality which replaces the existing row if a match is found on the primary keys.

This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.

If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.

## Bulk Mode

Bulk mode enables efficient batching of writes to CosmosDB, reducing API calls and improving throughput. Enable it with:

```
connect.cosmosdb.bulk.enabled=true
```

When enabled, you can control batching behavior using the KCQL `PROPERTIES` clause (`flush.size`, `flush.count`, `flush.interval`). If disabled, records are written individually.

## Custom Key Strategy

You can control how the connector populates the `id` field in CosmosDB documents using:

* `connect.cosmosdb.key.source` (Key, Metadata, KeyPath, ValuePath)
* `connect.cosmosdb.key.path` (field path, used with KeyPath or ValuePath)

This allows you to use the Kafka record key, metadata, or a specific field from the key or value as the document ID.

## Error Handling

Configure error handling and retry behavior with:

* `connect.cosmosdb.error.policy` (NOOP, THROW, RETRY)
* `connect.cosmosdb.max.retries` (max retry attempts)
* `connect.cosmosdb.retry.interval` (milliseconds between retries)
* `connect.cosmosdb.error.threshold` (number of errors tolerated before failing)

These settings control how the connector responds to errors during writes.

## Throughput

You can set the manual throughput (RU/s) for new CosmosDB collections with:

```
connect.cosmosdb.collection.throughput=400
```

The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.

## Proxy

If you need to connect via a proxy, specify the proxy details with:

```
connect.cosmosdb.proxy=<proxy-uri>
```

## Progress Reporting

Enable progress reporting to log how many records have been processed:

```
connect.progress.enabled=true
```

## Option Reference <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

| Name                                        | Description                                                                                                                                                                                                                                 | Type     | Default Value |
| ------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | ------------- |
| **Authentication & Connection**             |                                                                                                                                                                                                                                             |          |               |
| connect.cosmosdb.endpoint                   | The Azure CosmosDB end point.                                                                                                                                                                                                               | string   |               |
| connect.cosmosdb.master.key                 | The connection master key.                                                                                                                                                                                                                  | password |               |
| connect.cosmosdb.proxy                      | Specifies the connection proxy details.                                                                                                                                                                                                     | string   |               |
|                                             |                                                                                                                                                                                                                                             |          |               |
| **Database & Throughput**                   |                                                                                                                                                                                                                                             |          |               |
| connect.cosmosdb.db                         | The Azure CosmosDB target database.                                                                                                                                                                                                         | string   |               |
| connect.cosmosdb.db.create                  | If set to true it will create the database if it doesn't exist. If this is set to default(false) an exception will be raised.                                                                                                               | boolean  | false         |
| connect.cosmosdb.collection.throughput      | The manual throughput to provision for new Cosmos DB collections (RU/s). The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.                                                 | int      | 400           |
|                                             |                                                                                                                                                                                                                                             |          |               |
| **Key & Partitioning**                      |                                                                                                                                                                                                                                             |          |               |
| connect.cosmosdb.key.source                 | The source of the key. There are 4 possible values: Key, Metadata, KeyPath or ValuePath.                                                                                                                                                    | string   | Key           |
| connect.cosmosdb.key.path                   | When used with key.source configurations of `KeyPath` or `ValuePath`, this is the path to the field in the object that will be used as the key. Defaults to 'id'.                                                                           | string   | id            |
|                                             |                                                                                                                                                                                                                                             |          |               |
| **Write & Consistency**                     |                                                                                                                                                                                                                                             |          |               |
| connect.cosmosdb.consistency.level          | Determines the write visibility. There are four possible values: Strong, BoundedStaleness, Session or Eventual.                                                                                                                             | string   | Session       |
| connect.cosmosdb.bulk.enabled               | Enable bulk mode to reduce chatter.                                                                                                                                                                                                         | boolean  | false         |
| connect.cosmosdb.kcql                       | KCQL expression describing field selection and data routing to the target CosmosDb.                                                                                                                                                         | string   |               |
|                                             |                                                                                                                                                                                                                                             |          |               |
| **Error Handling & Retries**                |                                                                                                                                                                                                                                             |          |               |
| connect.cosmosdb.error.policy               | Specifies the action to be taken if an error occurs while inserting the data. Options: NOOP (swallow error), THROW (propagate error), RETRY (retry message). The number of retries is based on the error.                                   | string   | THROW         |
| connect.cosmosdb.max.retries                | The maximum number of times to try the write again.                                                                                                                                                                                         | int      | 20            |
| connect.cosmosdb.retry.interval             | The time in milliseconds between retries.                                                                                                                                                                                                   | int      | 60000         |
| connect.cosmosdb.error.threshold            | The number of errors to tolerate before failing the sink.                                                                                                                                                                                   | int      | 5             |
|                                             |                                                                                                                                                                                                                                             |          |               |
| **Queue & Performance**                     |                                                                                                                                                                                                                                             |          |               |
| connect.cosmosdb.flush.count.enable         | Flush on count can be disabled by setting this property to 'false'.                                                                                                                                                                         | boolean  | true          |
| connect.cosmosdb.upload.sync.period         | The time in milliseconds to wait before sending the request.                                                                                                                                                                                | int      | 100           |
| connect.cosmosdb.executor.threads           | The number of threads to use for processing the records.                                                                                                                                                                                    | int      | 1             |
| connect.cosmosdb.max.queue.size             | The maximum number of records to queue per topic before blocking. If the queue limit is reached the connector will throw RetriableException and the connector settings to handle retries will be used.                                      | int      | 1000000       |
| connect.cosmosdb.max.queue.offer.timeout.ms | The maximum time in milliseconds to wait for the queue to accept a record. If the queue does not accept the record within this time, the connector will throw RetriableException and the connector settings to handle retries will be used. | int      | 120000        |
|                                             |                                                                                                                                                                                                                                             |          |               |
| **Monitoring & Progress**                   |                                                                                                                                                                                                                                             |          |               |
| connect.progress.enabled                    | Enables the output for how many records have been processed.                                                                                                                                                                                | boolean  | false         |

## KCQL Properties

> **Note:** The following KCQL `PROPERTIES` options are only available when `connect.cosmosdb.bulk.enabled=true`. If bulk mode is disabled, records are written individually.

The CosmosDB Sink Connector supports the following KCQL `PROPERTIES` options to control file flushing behavior:

| Property       | Description                                                  | Type | Default Value       |
| -------------- | ------------------------------------------------------------ | ---- | ------------------- |
| flush.size     | Specifies the size (in bytes) for the flush operation.       | Long | (connector default) |
| flush.count    | Specifies the number of records for the flush operation.     | Int  | (connector default) |
| flush.interval | Specifies the interval (in seconds) for the flush operation. | Long | (connector default) |

**Flush Options Explained:**

* **Flush by Count**: Triggers a flush after a specified number of records have been written.
* **Flush by Size**: Initiates a flush once a predetermined size (in bytes) has been reached.
* **Flush by Interval**: Enforces a flush after a defined time interval (in seconds), acting as a fail-safe to ensure timely data management even if other flush conditions are not met.

You can use these properties in your KCQL statement's `PROPERTIES` clause, for example:

```sql
INSERT INTO myCollection SELECT * FROM myTopic PROPERTIES('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
```

## Kafka payload support <a href="#kafka-payload-support" id="kafka-payload-support"></a>

This sink supports the following Kafka payloads:

* Schema.Struct and Struct (Avro)
* Schema.Struct and JSON
* No Schema and JSON

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](https://docs.lenses.io/latest/connectors/tutorials/using-error-policies).

## Endpoint and Master Key Configuration

To connect to your Azure CosmosDB instance, the connector requires two essential configuration properties:

* `connect.cosmosdb.endpoint`: This specifies the URI of your CosmosDB account. It should point to the connection endpoint provided in the Azure CosmosDB dashboard.
* `connect.cosmosdb.master.key`: This is the authentication key used to access the database. Note: Azure often refers to this as the primary key, which can be confusing—the master.key in the connector configuration corresponds to Azure's primary key value.

Both properties are mandatory and must be set to establish a connection with CosmosDB.

## Key Population Strategy

The connector offers flexible options for populating the id field of documents in CosmosDB. The behavior is controlled by two configurations:

`connect.cosmosdb.key.source`

* Description: Defines the strategy used to extract or generate the document ID.
* Valid Values:
  * Key (default): Use the Kafka record key.
  * Metadata: Use Kafka metadata (topic/partition/offset).
  * KeyPath: Extract from a field within the Kafka key.
  * ValuePath: Extract from a field within the Kafka value.
* Display Name: Key strategy

`connect.cosmosdb.key.path`

* Description: Specifies the field path to extract the key from, when using KeyPath or ValuePath.
* Default: id
* Example: If using `ValuePath` with `connect.cosmosdb.key.path=id`, the connector will use `value.id` as the document ID.
