This page describes the usage of the Stream Reactor MongoDB Sink Connector.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert is the default write mode of the sink.
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
The BATCH clause controls the batching of writes to MongoDB.
TLS/SSL is supported by setting ?ssl=true in the connect.mongo.connection
option. The MongoDB driver will then attempt to load the truststore and keystore using the JVM system properties.
You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:
javax.net.ssl.trustStore: the path to a trust store containing the certificate of the signing authority
javax.net.ssl.trustStorePassword: the password to access this trust store
javax.net.ssl.keyStore: the path to a key store containing the client’s SSL certificates
javax.net.ssl.keyStorePassword: the password to access this key store
All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism
in the connect.mongo.connection
option.
The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism
option.
If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism
has precedence.
e.g.
List of fields that should be converted to ISO Date on MongoDB insertion (comma-separated field names), for JSON topics only. Field values may be an epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If the string does not parse to ISO, it will be written as a string instead.
Subdocument fields can be referred to in the following examples:
topLevelFieldName
topLevelSubDocument.FieldName
topLevelParent.subDocument.subDocument2.FieldName
If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
This is controlled via the connect.mongo.json_datetime_fields
option.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
The connector supports Error policies.
Name | Description | Type | Default Value |
---|---|---|---|
ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
list
ssl.enabled.protocols
The list of protocols enabled for SSL connections.
list
[TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.password
The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
password
ssl.key.password
The password of the private key in the key store file. This is optional for client.
password
ssl.keystore.type
The file format of the key store file. This is optional for client.
string
JKS
ssl.truststore.location
The location of the trust store file.
string
ssl.endpoint.identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate.
string
https
ssl.protocol
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
string
TLS
ssl.trustmanager.algorithm
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
string
PKIX
ssl.secure.random.implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations.
string
ssl.truststore.type
The file format of the trust store file.
string
JKS
ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
string
SunX509
ssl.provider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
string
ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
string
ssl.truststore.password
The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
password
connect.mongo.connection
The mongodb connection in the format mongodb://[username:password@]host1[:port1],host2[:port2],…[,hostN[:portN]]][/[database][?options]].
string
connect.mongo.db
The mongodb target database.
string
connect.mongo.username
The username to use when authenticating
string
connect.mongo.password
The password for the use when authenticating
password
connect.mongo.auth.mechanism
String
string
SCRAM-SHA-1
connect.mongo.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mongo.max.retries
The maximum number of times to try the write again.
int
20
connect.mongo.retry.interval
The time in milliseconds between retries.
int
60000
connect.mongo.kcql
KCQL expression describing field selection and data routing to the target mongo db.
string
connect.mongo.json_datetime_fields
List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead. Subdocument fields can be referred to as in the following examples: “topLevelFieldName”, “topLevelSubDocument.FieldName”, “topLevelParent.subDocument.subDocument2.FieldName”, (etc.) If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
list
[]
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false