Elasticsearch
This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.
Connector Class
Elasticsearch 6
Elasticsearch 7
Example
For more examples see the tutorials.
KCQL support
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Kafka Tombstone Handling
It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values
property in your KCQL with one of the possible values:
IGNORE
(ignores tombstones entirely)FAIL
(throws Exception if tombstone happens)DELETE
(deletes index with specified id)
Example:
Primary Keys
The PK keyword can be used to specify the fields which will be used for the key value in Elastic. The field values will be concatenated and separated by a -
. If no fields are set the topic name, partition and message offset are used.
Insert and Upsert modes
INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.
Document Type
WITHDOCTYPE
allows you to associate a document type to the document inserted.
Index Suffix
WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.
Example:
Index Names
Static Index Names
To use a static index name, define the target index in the KCQL statement without any prefixes:
This will consistently create an index named index_name
for any messages consumed from topicA
.
Extracting Index Names from Headers, Keys, and Values
Headers
To extract an index name from a message header, use the _header
prefix followed by the header name:
This statement extracts the value from the gate
header field and uses it as the index name.
For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
In this case, the value of the header named prefix.abc.suffix
is used to form the index name.
Keys
To use the full value of the message key as the index name, use the _key
prefix:
For example, if the message key is "freddie"
, the resulting index name will be freddie
.
Values
To extract an index name from a field within the message value, use the _value
prefix followed by the field name:
This example uses the value of the name
field from the message's value. If the field contains "jason"
, the index name will be jason
.
Nested Fields in Values
To access nested fields within a value, specify the full path using dot notation:
If the firstName
field is nested within the name
structure, its value (e.g., "hans"
) will be used as the index name.
Fields with Dots in Their Names
For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
If the value structure contains:
The extracted index name will be hans
.
Auto Index Creation
The Sink will automatically create missing indexes at startup.
Please note that this feature is not compatible with index names extracted from message headers/keys/values.
Options Reference
Name | Description | Type | Default Value |
---|---|---|---|
connect.elastic.protocol | URL protocol (http, https) | string | http |
connect.elastic.hosts | List of hostnames for Elastic Search cluster node, not including protocol or port. | string | localhost |
connect.elastic.port | Port on which Elastic Search node listens on | string | 9300 |
connect.elastic.tableprefix | Table prefix (optional) | string | |
connect.elastic.cluster.name | Name of the elastic search cluster, used in local mode for setting the connection | string | elasticsearch |
connect.elastic.write.timeout | The time to wait in millis. Default is 5 minutes. | int | 300000 |
connect.elastic.batch.size | How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once | int | 4000 |
connect.elastic.use.http.username | Username if HTTP Basic Auth required default is null. | string | |
connect.elastic.use.http.password | Password if HTTP Basic Auth required default is null. | string | |
connect.elastic.error.policy | Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.elastic.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.elastic.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.elastic.kcql | KCQL expression describing field selection and routes. | string | |
connect.elastic.pk.separator | Separator used when have more that one field in PK | string | - |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
KCQL Properties
Name | Description | Type | Default Value |
---|---|---|---|
behavior.on.null.values | Specifies behavior on Kafka tombstones: | String | IGNORE |
SSL Configuration Properties
Property Name | Description |
| Path to the truststore file containing the trusted CA certificates for verifying broker certificates. |
| Password for the truststore file to protect its integrity. |
| Type of the truststore (e.g., |
| Path to the keystore file containing the client’s private key and certificate chain for client authentication. |
| Password for the keystore to protect the private key. |
| Type of the keystore (e.g., |
| The SSL protocol used for secure connections (e.g., |
| Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
| Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
SSL Configuration
Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.
While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:
Data encryption: Prevents unauthorized access to data being transferred.
Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.
Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).
Configuration Example
Terminology:
Truststore: Holds certificates to check if the node’s certificate is valid.
Keystore: Contains your client’s private key and certificate to prove your identity to the node.
SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.
Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.
Last updated