Azure CosmosDB

A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.

KCQL support 

The following KCQL is supported:

INTO <your-collection>
FROM kafka_topic 


-- Insert mode, select all fields from topicA 
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA

-- UPSERT mode, select 3 fields and 
-- rename from topicB and write to tableB 
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id


Insert Mode 

Insert is the default write mode of the sink. It inserts messages from Kafka topics into DocumentDB.

A failure to insert a record in DocumentDB may occur due to integrity constraints or other exceptions such as casting issues. Kafka currently provides at least once delivery semantics. Therefore, this mode may produce errors if unique constraints have been implemented on the target tables. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.

Upsert Mode 

The Sink supports DocumentDB upsert functionality which replaces the existing row if a match is found on the primary keys.

This mode works nicely with at least once delivery semantics on Kafka as order is a guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.

If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices.


Launch the stack 

  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=documentdb
docker-compose up -d fastdata

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select CosmosDB as the sink and paste the following:

connect.documentdb.kcql=INSERT INTO orders SELECT * FROM orders-string

To start the connector without using Lenses, log into the fastdatadev container:

docker exec -ti fastdata /bin/bash

and create a file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create cosmosdb <

Wait a for the connector to start and check its running:

connect-cli status cosmosdb

Inserting test data 

In the to fastdata container start the kafka producer shell:

kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders-string \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"string"}, {"name":"qty", "type":"string"}]}'

the console is now waiting for your input, enter the following:

{"id": "1", "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": "94.2", "qty":"100"}

Check for data in CosmosDB 

In the Azure portal, select the dm database in the Data explorer, select query and run.


Clean up 

Bring down the stack:

docker-compose down


NameDescriptionTypeDefault Value
connect.documentdb.endpointThe Azure DocumentDb end point.string
connect.documentdb.master.keyThe connection master keypassword
connect.documentdb.consistency.levelDetermines the write visibility. There are four possible values: Strong,BoundedStaleness,Session or EventualstringSession
connect.documentdb.dbThe Azure DocumentDb target database.string
connect.documentdb.db.createIf set to true it will create the database if it doesn’t exist. If this is set to default(false) an exception will be raised.booleanfalse
connect.documentdb.proxySpecifies the connection proxy details.string
connect.documentdb.error.policySpecifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.documentdb.max.retriesThe maximum number of times to try the write again.int20
connect.documentdb.retry.intervalThe time in milliseconds between retries.int60000
connect.documentdb.kcqlKCQL expression describing field selection and data routing to the target DocumentDb.string
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse