All pages
Powered by GitBook
1 of 1

Loading...

Elasticsearch

This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.

Connector Class

Elasticsearch 6

Elasticsearch 7

Example

For more examples see the .

KCQL support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

Examples:

Kafka Tombstone Handling

It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:

  • IGNORE (ignores tombstones entirely)

  • FAIL (throws Exception if tombstone happens)

  • DELETE (deletes index with specified id)

Example:

Primary Keys

The PK keyword allows you to specify fields that will be used to generate the key value in Elasticsearch. The values of the selected fields are concatenated and separated by a hyphen (-).

If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.

Field Prefixes

When defining fields, specific prefixes can be used to determine where the data should be extracted from:

  • _key Prefix Specifies that the value should be extracted from the message key.

    • If a path is provided after _key, it identifies the location within the key where the field value resides.

    • If no path is provided, the entire message key is used as the value.

Insert and Upsert modes

INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.

Document Type

WITHDOCTYPE allows you to associate a document type to the document inserted.

Index Suffix

WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.

Example:


Index Names

Static Index Names

To use a static index name, define the target index in the KCQL statement without any prefixes:

This will consistently create an index named index_name for any messages consumed from topicA.

Extracting Index Names from Headers, Keys, and Values

Headers

To extract an index name from a message header, use the _header prefix followed by the header name:

This statement extracts the value from the gate header field and uses it as the index name.

For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

In this case, the value of the header named prefix.abc.suffix is used to form the index name.

Keys

To use the full value of the message key as the index name, use the _key prefix:

For example, if the message key is "freddie", the resulting index name will be freddie.

Values

To extract an index name from a field within the message value, use the _value prefix followed by the field name:

This example uses the value of the name field from the message's value. If the field contains "jason", the index name will be jason.

Nested Fields in Values

To access nested fields within a value, specify the full path using dot notation:

If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.

Fields with Dots in Their Names

For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

If the value structure contains:

The extracted index name will be hans.

Auto Index Creation

The Sink will automatically create missing indexes at startup.

Please note that this feature is not compatible with index names extracted from message headers/keys/values.

Options Reference

Name
Description
Type
Default Value

KCQL Properties

Name
Description
Type
Default Value

SSL Configuration Properties

SSL Configuration

Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.

While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:

  • Data encryption: Prevents unauthorized access to data being transferred.

  • Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.

  • Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).

Configuration Example

Terminology:

  • Truststore: Holds certificates to check if the node’s certificate is valid.

  • Keystore: Contains your client’s private key and certificate to prove your identity to the node.

  • SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.

  • Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.

io.lenses.streamreactor.connect.elastic6.ElasticSinkConnector

_value Prefix Specifies that the value should be extracted from the message value.

  • The remainder of the path identifies the specific location within the message value to extract the field.

  • _header Prefix Specifies that the value should be extracted from the message header.

    • The remainder of the path indicates the name of the header to be used for the field value.

  • connect.elastic.tableprefix

    Table prefix (optional)

    string

    connect.elastic.cluster.name

    Name of the elastic search cluster, used in local mode for setting the connection

    string

    elasticsearch

    connect.elastic.write.timeout

    The time to wait in millis. Default is 5 minutes.

    int

    300000

    connect.elastic.batch.size

    How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once

    int

    4000

    connect.elastic.use.http.username

    Username if HTTP Basic Auth required default is null.

    string

    connect.elastic.use.http.password

    Password if HTTP Basic Auth required default is null.

    string

    connect.elastic.error.policy

    Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.elastic.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.elastic.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.elastic.kcql

    KCQL expression describing field selection and routes.

    string

    connect.elastic.pk.separator

    Separator used when have more that one field in PK

    string

    -

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.

    ssl.trustmanager.algorithm

    Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    ssl.keymanager.algorithm

    Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    connect.elastic.protocol

    URL protocol (http, https)

    string

    http

    connect.elastic.hosts

    List of hostnames for Elastic Search cluster node, not including protocol or port.

    string

    localhost

    connect.elastic.port

    Port on which Elastic Search node listens on

    string

    behavior.on.null.values

    Specifies behavior on Kafka tombstones: IGNORE , DELETE or FAIL

    String

    IGNORE

    Property Name

    Description

    ssl.truststore.location

    Path to the truststore file containing the trusted CA certificates for verifying broker certificates.

    ssl.truststore.password

    Password for the truststore file to protect its integrity.

    ssl.truststore.type

    Type of the truststore (e.g., JKS, PKCS12). Default is JKS.

    ssl.keystore.location

    Path to the keystore file containing the client’s private key and certificate chain for client authentication.

    ssl.keystore.password

    Password for the keystore to protect the private key.

    ssl.keystore.type

    Type of the keystore (e.g., JKS, PKCS12). Default is JKS.

    tutorials

    9300

    ssl.protocol

    io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
    name=elastic
    connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
    tasks.max=1
    topics=orders
    connect.elastic.protocol=http
    connect.elastic.hosts=elastic
    connect.elastic.port=9200
    connect.elastic.cluster.name=elasticsearch
    connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
    connect.progress.enabled=true
    INSERT | UPSERT
    INTO <elastic_index >
    SELECT FIELD, ...
    FROM kafka_topic
    [PK FIELD,...]
    [WITHDOCTYPE=<your_document_type>]
    [WITHINDEXSUFFIX=<your_suffix>]
    -- Insert mode, select all fields from topicA and write to indexA
    INSERT INTO indexA SELECT * FROM topicA
    
    -- Insert mode, select 3 fields and rename from topicB
    -- and write to indexB
    INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
    
    -- UPSERT
    UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
    INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')
    WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
    INSERT INTO index_name SELECT * FROM topicA
    INSERT INTO _header.gate SELECT * FROM topicA
    INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA
    INSERT INTO _key SELECT * FROM topicA
    INSERT INTO _value.name SELECT * FROM topicA
    INSERT INTO _value.name.firstName SELECT * FROM topicA
    INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
    {
      "customer.name": {
        "first.name": "hans"
      }
    }
    ssl.truststore.location=/path/to/truststore.jks
    ssl.truststore.password=your_truststore_password
    ssl.truststore.type=JKS  # Can also be PKCS12 if applicable
    
    ssl.keystore.location=/path/to/keystore.jks
    ssl.keystore.password=your_keystore_password
    ssl.keystore.type=JKS  # Can also be PKCS12 if applicable
    
    ssl.protocol=TLSv1.2  # Or TLSv1.3 for stronger security
    
    ssl.trustmanager.algorithm=PKIX  # Default algorithm for managing certificates
    ssl.keymanager.algorithm=PKIX  # Default algorithm for managing certificates