MongoDB

Kafka Connect sink connector for writing data from Kafka to MongoDB.

KCQL support 

The following KCQL is supported:

INSERT | UPSERT 
INTO <collection_name>
SELECT FIELD, ... 
FROM <kafka-topic>
BATCH = 100

Examples:

--  Select all fields from topic fx_prices and insert into the fx collection
INSERT INTO fx SELECT * FROM fx_prices

--  Select all fields from topic fx_prices and upsert into the fx collection,
--  The assumption is there will be a ticker field in the incoming json:
UPSERT INTO fx SELECT * FROM fx_prices PK ticker

Concepts 

Insert Mode 

Insert is the default write mode of the sink.

Kafka currently can provide exactly once delivery semantics, however to ensure no errors are produced if unique constraints have been implemented on the target tables, the sink can run in UPSERT mode. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.

Upsert Mode 

The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

Batching 

The BATCH clause controls the batching of writes to MongoDB.

TLS/SSL 

TLS/SSL is support by setting ?ssl=true in the connect.mongo.connection option. The MongoDB driver will then load attempt to load the truststore and keystore using the JVM system properties.

You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:

  • javax.net.ssl.trustStore: the path to a trust store containing the certificate of the signing authority
  • javax.net.ssl.trustStorePassword: the password to access this trust store
  • javax.net.ssl.keyStore: the path to a key store containing the client’s SSL certificates
  • javax.net.ssl.keyStorePassword: the password to access this key store

Authentication Mechanism 

All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism in the connect.mongo.connection option.

The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism option.

If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism has precedence.

e.g.

# default of scram
mongodb://host1/?authSource=db1
# scram explict
mongodb://host1/?authSource=db1&authMechanism=SCRAM-SHA-1
# mongo-cr
mongodb://host1/?authSource=db1&authMechanism=MONGODB-CR
# x.509
mongodb://host1/?authSource=db1&authMechanism=MONGODB-X509
# kerberos
mongodb://host1/?authSource=db1&authMechanism=GSSAPI
# ldap
mongodb://host1/?authSource=db1&authMechanism=PLAIN

JSON Field dates 

List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead.

Subdocument fields can be referred to as in the following examples:

  • topLevelFieldName
  • topLevelSubDocument.FieldName
  • topLevelParent.subDocument.subDocument2.FieldName

If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.

This is controlled via the connect.mongo.json_datetime_fields option.

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=mongo
docker-compose up -d mongo

Prepare the target system 

Login into the MongoDB container and create the container

docker exec -ti mongo mongo

create a database by inserting a dummy record:

use connect
db.dummy.insert({"name":"Kafka Rulz!"})

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select MongoDB as the sink and paste the following:

name=mongo
connector.class=com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector
tasks.max=1
topics=orders
connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders
connect.mongo.db=connect
connect.mongo.connection=mongodb://mongo:27017

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create mongo < connector.properties

connect-cli create mongo < connector.properties

Wait a for the connector to start and check its running:

connect-cli status mongo

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}

Check for data in Mongo 

In the Mongo container:

db.orders.find();

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
ssl.cipher.suitesA list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.list
ssl.enabled.protocolsThe list of protocols enabled for SSL connections.list[TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.passwordThe store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.password
ssl.key.passwordThe password of the private key in the key store file. This is optional for client.password
ssl.keystore.typeThe file format of the key store file. This is optional for client.stringJKS
ssl.truststore.locationThe location of the trust store file.string
ssl.endpoint.identification.algorithmThe endpoint identification algorithm to validate server hostname using server certificate.stringhttps
ssl.protocolThe SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.stringTLS
ssl.trustmanager.algorithmThe algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.stringPKIX
ssl.secure.random.implementationThe SecureRandom PRNG implementation to use for SSL cryptography operations.string
ssl.truststore.typeThe file format of the trust store file.stringJKS
ssl.keymanager.algorithmThe algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.stringSunX509
ssl.providerThe name of the security provider used for SSL connections. Default value is the default security provider of the JVM.string
ssl.keystore.locationThe location of the key store file. This is optional for client and can be used for two-way authentication for client.string
ssl.truststore.passwordThe password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.password
connect.mongo.connectionThe mongodb connection in the format mongodb://[username:password@]host1[:port1][,host2[:port2],…[,hostN[:portN]]][/[database][?options]].string
connect.mongo.dbThe mongodb target database.string
connect.mongo.usernameThe username to use when authenticatingstring
connect.mongo.passwordThe password for the use when authenticatingpassword
connect.mongo.auth.mechanismStringstringSCRAM-SHA-1
connect.mongo.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.mongo.max.retriesThe maximum number of times to try the write again.int20
connect.mongo.retry.intervalThe time in milliseconds between retries.int60000
connect.mongo.kcqlKCQL expression describing field selection and data routing to the target mongo db.string
connect.mongo.json_datetime_fieldsList of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead. Subdocument fields can be referred to as in the following examples: “topLevelFieldName”, “topLevelSubDocument.FieldName”, “topLevelParent.subDocument.subDocument2.FieldName”, (etc.) If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.list[]
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse