View the latest documentation 5.5
Kafka Connect sink connector for writing data from Kafka to MongoDB.
The following KCQL is supported:
INSERT | UPSERT INTO <collection_name> SELECT FIELD, ... FROM <kafka-topic> BATCH = 100
Examples:
-- Select all fields from topic fx_prices and insert into the fx collection INSERT INTO fx SELECT * FROM fx_prices -- Select all fields from topic fx_prices and upsert into the fx collection, -- The assumption is there will be a ticker field in the incoming json: UPSERT INTO fx SELECT * FROM fx_prices PK ticker
Insert is the default write mode of the sink.
Kafka currently can provide exactly once delivery semantics, however to ensure no errors are produced if unique constraints have been implemented on the target tables, the sink can run in UPSERT mode. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
The BATCH clause controls the batching of writes to MongoDB.
TLS/SSL is support by setting ?ssl=true in the connect.mongo.connection option. The MongoDB driver will then load attempt to load the truststore and keystore using the JVM system properties.
You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:
All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism in the connect.mongo.connection option.
The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism option.
If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism has precedence.
e.g.
# default of scram mongodb://host1/?authSource=db1 # scram explict mongodb://host1/?authSource=db1&authMechanism=SCRAM-SHA-1 # mongo-cr mongodb://host1/?authSource=db1&authMechanism=MONGODB-CR # x.509 mongodb://host1/?authSource=db1&authMechanism=MONGODB-X509 # kerberos mongodb://host1/?authSource=db1&authMechanism=GSSAPI # ldap mongodb://host1/?authSource=db1&authMechanism=PLAIN
List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead.
Subdocument fields can be referred to as in the following examples:
If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
This is controlled via the connect.mongo.json_datetime_fields option.
This sink supports the following Kafka payloads:
See connect payloads for more information.
The connector supports Error polices.
export CONNECTOR=mongo docker-compose up -d mongo
Login into the MongoDB container and create the container
docker exec -ti mongo mongo
create a database by inserting a dummy record:
use connect db.dummy.insert({"name":"Kafka Rulz!"})
If you are using Lenses, login into Lenses and navigate to the connectors page, select MongoDB as the sink and paste the following:
name=mongo connector.class=io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector tasks.max=1 topics=orders connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders connect.mongo.db=connect connect.mongo.connection=mongodb://mongo:27017
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create mongo < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status mongo
In the to lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
In the Mongo container:
db.orders.find();
Bring down the stack:
docker-compose down
On this page