Azure CosmosDB
This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.
A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.
Connector Class
Example
For more examples see the tutorials.
KCQL support
You can specify multiple KCQL statements separated by**;
** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert Mode
Insert is the default write mode of the sink. It inserts messages from Kafka topics into DocumentDB.
Upsert Mode
The Sink supports DocumentDB upsert functionality which replaces the existing row if a match is found on the primary keys.
This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.
If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.
Kafka payload support
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
Error policies
The connector supports Error policies.
Option Reference
connect.documentdb.endpoint
The Azure DocumentDb end point.
string
connect.documentdb.master.key
The connection master key
password
connect.documentdb.consistency.level
Determines the write visibility. There are four possible values: Strong,BoundedStaleness,Session or Eventual
string
Session
connect.documentdb.db
The Azure DocumentDb target database.
string
connect.documentdb.db.create
If set to true it will create the database if it doesn’t exist. If this is set to default(false) an exception will be raised.
boolean
false
connect.documentdb.proxy
Specifies the connection proxy details.
string
connect.documentdb.error.policy
Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.documentdb.max.retries
The maximum number of times to try the write again.
int
20
connect.documentdb.retry.interval
The time in milliseconds between retries.
int
60000
connect.documentdb.kcql
KCQL expression describing field selection and data routing to the target DocumentDb.
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
Last updated