4.3
Azure CosmosDB
A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.
KCQL support
The following KCQL is supported:
INSERT | UPSERT
INTO <your-collection>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELDS,...]
Examples:
-- Insert mode, select all fields from topicA
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA
-- UPSERT mode, select 3 fields and
-- rename from topicB and write to tableB
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id
Concepts
Insert Mode
Insert is the default write mode of the sink. It inserts messages from Kafka topics into DocumentDB.
A failure to insert a record in DocumentDB may occur due to integrity constraints or other exceptions such as casting issues. Kafka currently provides at least once delivery semantics. Therefore, this mode may produce errors if unique constraints have been implemented on the target tables. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.
Upsert Mode
The Sink supports DocumentDB upsert functionality which replaces the existing row if a match is found on the primary keys.
This mode works nicely with at least once delivery semantics on Kafka as order is a guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.
If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=documentdb
docker-compose up -d fastdata
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select CosmosDB as the sink and paste the following:
name=cosmosdb
connector.class=com.datamountaineer.streamreactor.connect.azure.documentdb.sink.DocumentDbSinkConnector
tasks.max=1
topics=orders-string
connect.documentdb.kcql=INSERT INTO orders SELECT * FROM orders-string
connect.documentdb.db=dm
connect.documentdb.endpoint=[YOUR_AZURE_ENDPOINT]
connect.documentdb.db.create=true
connect.documentdb.master.key=[YOUR_MASTER_KEY]
connect.documentdb.batch.size=10
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create cosmosdb < connector.properties
Wait a for the connector to start and check its running:
connect-cli status cosmosdb
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders-string \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"string"}, {"name":"qty", "type":"string"}]}'
the console is now waiting for your input, enter the following:
{"id": "1", "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": "94.2", "qty":"100"}
Check for data in CosmosDB
In the Azure portal, select the dm database in the Data explorer, select query and run.
SELECT * FROM c
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.documentdb.endpoint | The Azure DocumentDb end point. | string | |
connect.documentdb.master.key | The connection master key | password | |
connect.documentdb.consistency.level | Determines the write visibility. There are four possible values: Strong,BoundedStaleness,Session or Eventual | string | Session |
connect.documentdb.db | The Azure DocumentDb target database. | string | |
connect.documentdb.db.create | If set to true it will create the database if it doesn’t exist. If this is set to default(false) an exception will be raised. | boolean | false |
connect.documentdb.proxy | Specifies the connection proxy details. | string | |
connect.documentdb.error.policy | Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.documentdb.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.documentdb.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.documentdb.kcql | KCQL expression describing field selection and data routing to the target DocumentDb. | string | |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |