LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Connector Class
  • Example
  • KCQL support
  • Insert Mode
  • Upsert Mode
  • Kafka payload support
  • Error policies
  • Option Reference

Was this helpful?

Export as PDF
  1. Connectors
  2. Sinks

Azure CosmosDB

This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.

PreviousAWS S3NextAzure Data Lake Gen2

Last updated 7 months ago

Was this helpful?

A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.

Connector Class

io.lenses.streamreactor.connect.azure.documentdb.sink.DocumentDbSinkConnector

Example

For more examples see the .

name=cosmosdb
connector.class=io.lenses.streamreactor.connect.azure.documentdb.sink.DocumentDbSinkConnector
tasks.max=1
topics=orders-string
connect.documentdb.kcql=INSERT INTO orders SELECT * FROM orders-string
connect.documentdb.db=dm
connect.documentdb.endpoint=[YOUR_AZURE_ENDPOINT]
connect.documentdb.db.create=true
connect.documentdb.master.key=[YOUR_MASTER_KEY]
connect.documentdb.batch.size=10

KCQL support

You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

INSERT | UPSERT
INTO <your-collection>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELDS,...]

Examples:

-- Insert mode, select all fields from topicA
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA

-- UPSERT mode, select 3 fields and
-- rename from topicB and write to tableB
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id

Insert Mode

Insert is the default write mode of the sink. It inserts messages from Kafka topics into DocumentDB.

Upsert Mode

The Sink supports DocumentDB upsert functionality which replaces the existing row if a match is found on the primary keys.

This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.

If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.

Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

Error policies

Option Reference

Name
Description
Type
Default Value

connect.documentdb.endpoint

The Azure DocumentDb end point.

string

connect.documentdb.master.key

The connection master key

password

connect.documentdb.consistency.level

Determines the write visibility. There are four possible values: Strong,BoundedStaleness,Session or Eventual

string

Session

connect.documentdb.db

The Azure DocumentDb target database.

string

connect.documentdb.db.create

If set to true it will create the database if it doesn’t exist. If this is set to default(false) an exception will be raised.

boolean

false

connect.documentdb.proxy

Specifies the connection proxy details.

string

connect.documentdb.error.policy

Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.documentdb.max.retries

The maximum number of times to try the write again.

int

20

connect.documentdb.retry.interval

The time in milliseconds between retries.

int

60000

connect.documentdb.kcql

KCQL expression describing field selection and data routing to the target DocumentDb.

string

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

The connector supports .

tutorials
Error policies