For the complete documentation index, see llms.txt. This page is also available as Markdown.

OpenSearch

This page describes the usage of the Stream Reactor OpenSearch Sink Connector.

If you are connecting to Elasticsearch (6 or 7) rather than OpenSearch, see the Elasticsearch sink page.

Connector Class

io.lenses.streamreactor.connect.opensearch.OpenSearchSinkConnector

Example

name=opensearch
connector.class=io.lenses.streamreactor.connect.opensearch.OpenSearchSinkConnector
tasks.max=1
topics=orders
connect.opensearch.protocol=https
connect.opensearch.hosts=opensearch-host
connect.opensearch.port=9200
connect.opensearch.use.http.username=admin
connect.opensearch.use.http.password=secret
connect.opensearch.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true

KCQL support

The following KCQL is supported:

Examples:

Kafka Tombstone Handling

It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:

  • IGNORE (ignores tombstones entirely)

  • FAIL (throws Exception if tombstone happens)

  • DELETE (deletes index with specified id)

Example:

Primary Keys

The PK keyword allows you to specify fields that will be used to generate the key value in OpenSearch. The values of the selected fields are concatenated and separated by a hyphen (-).

If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.

Field Prefixes

When defining fields, specific prefixes can be used to determine where the data should be extracted from:

  • _key Prefix Specifies that the value should be extracted from the message key.

    • If a path is provided after _key, it identifies the location within the key where the field value resides.

    • If no path is provided, the entire message key is used as the value.

  • _value Prefix Specifies that the value should be extracted from the message value.

    • The remainder of the path identifies the specific location within the message value to extract the field.

  • _header Prefix Specifies that the value should be extracted from the message header.

    • The remainder of the path indicates the name of the header to be used for the field value.

Insert and Upsert modes

INSERT writes new records to OpenSearch, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, or inserts a new one if none is found.

Index Suffix

WITHINDEXSUFFIX allows you to specify a suffix to your index and supports date format.

Example:


Index Names

Static Index Names

To use a static index name, define the target index in the KCQL statement without any prefixes:

This will consistently create an index named index_name for any messages consumed from topicA.

Extracting Index Names from Headers, Keys, and Values

Headers

To extract an index name from a message header, use the _header prefix followed by the header name:

This statement extracts the value from the gate header field and uses it as the index name.

For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

In this case, the value of the header named prefix.abc.suffix is used to form the index name.

Keys

To use the full value of the message key as the index name, use the _key prefix:

For example, if the message key is "freddie", the resulting index name will be freddie.

Values

To extract an index name from a field within the message value, use the _value prefix followed by the field name:

This example uses the value of the name field from the message's value. If the field contains "jason", the index name will be jason.

Nested Fields in Values

To access nested fields within a value, specify the full path using dot notation:

If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.

Fields with Dots in Their Names

For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

If the value structure contains:

The extracted index name will be hans.

Auto Index Creation

The Sink will automatically create missing indexes at startup.

Please note that this feature is not compatible with index names extracted from message headers/keys/values.

Strict Bulk Item Errors

By default the OpenSearch connector routes any per-item bulk failure through the configured ErrorPolicy. This is stricter than the legacy Elasticsearch 7 behaviour (which silently dropped item-level errors) and surfaces mapping conflicts and version conflicts as visible failures.

Set connect.opensearch.bulk.strict.item.errors=false to restore ES7-compatible tolerant behaviour where only HTTP-transport errors are surfaced.

AWS SigV4 Authentication

For AWS-managed OpenSearch Service and OpenSearch Serverless, the connector can sign requests with AWS SigV4 instead of (or in addition to) HTTP Basic auth.

Enable SigV4 signing with:

Use aws.signing.service=aoss for OpenSearch Serverless.

Credentials provider

Value
Description

DEFAULT

(Recommended) Uses the standard AWS SDK credential chain — IAM role on EC2/EKS, environment variables, ~/.aws/credentials. Credentials are refreshed automatically.

STATIC

Reads aws.access.key.id + aws.secret.access.key (+ optional aws.session.token) from the connector config. Keys are not rotated. Suitable for development or air-gapped environments only.

JWT Authentication

For clusters secured with bearer-token authentication, configure one of the two mutually-exclusive modes:

Static token (development / short-lived environments):

The token is read once at startup. It does not auto-refresh. The task will fail with HTTP 401 when the token expires.

File-based token (recommended for production):

The connector re-reads the file at the configured interval. Rotate the file contents with your IdP tooling and tune the interval to match the IdP's rotation cadence.

Restricting file access (recommended in multi-tenant Connect deployments):

When set, any path that resolves outside the base directory is rejected at read time, preventing a connector config from being used to read arbitrary worker-local files.

Connection Pool Tuning

The REST/HC5 transport path exposes two connection-pool knobs. These settings are ignored on the SigV4 path (the AWS SDK manages its own connection pool).

Property
Default
Description

connect.opensearch.max.connections.per.route

5

Maximum HTTP connections per OpenSearch node. Increase for high-throughput deployments.

connect.opensearch.max.connections.total

25

Maximum total connections across all nodes. Set to at least max.connections.per.route × number of nodes.

Cleartext Auth Guard

By default the connector rejects protocol=http when HTTP Basic auth or JWT bearer authentication is configured, because credentials would be transmitted in cleartext and could be intercepted by an on-path attacker.

Set connect.opensearch.security.allow.insecure.auth=true only when TLS termination is handled by a trusted local sidecar or service mesh. A WARN-level message is emitted at startup when this override is active.

Options Reference

Name
Description
Type
Default Value

connect.opensearch.protocol

URL protocol (http, https)

string

https

connect.opensearch.hosts

List of hostnames for the OpenSearch cluster nodes, not including protocol or port.

string

localhost

connect.opensearch.port

Port on which the OpenSearch node listens.

int

9200

connect.opensearch.kcql

KCQL expression describing field selection and routes.

string

connect.opensearch.write.timeout

The time to wait in milliseconds for a bulk request to complete. Default 300000 = 5 minutes. Note: unlike Elasticsearch 6/7, OpenSearch interprets this value in milliseconds.

int

300000

connect.opensearch.batch.size

How many records to process at one time.

int

4000

connect.opensearch.use.http.username

Username for HTTP Basic Auth. Leave empty to disable Basic Auth.

string

connect.opensearch.use.http.password

Password for HTTP Basic Auth. Leave empty to disable Basic Auth.

password

connect.opensearch.error.policy

Action on error: NOOP, THROW, or RETRY.

string

THROW

connect.opensearch.max.retries

Maximum number of retry attempts on write failure.

int

20

connect.opensearch.retry.interval

Time in milliseconds between retries.

int

60000

connect.opensearch.pk.separator

Separator used when more than one field is in PK.

string

-

connect.opensearch.bulk.strict.item.errors

When true (default), per-item bulk failures go through ErrorPolicy. When false, only HTTP-transport errors are surfaced, matching ES7 tolerant behaviour. WARNING: setting false neutralises per-item retry knobs.

boolean

true

connect.opensearch.max.connections.per.route

Maximum HTTP connections per OpenSearch node (HC5 REST transport only; ignored on SigV4 path).

int

5

connect.opensearch.max.connections.total

Maximum total HTTP connections across all nodes (HC5 REST transport only).

int

25

connect.opensearch.security.allow.insecure.auth

Allow HTTP Basic / JWT auth over protocol=http. Set true only when TLS is terminated by a trusted local sidecar. A WARN is emitted at startup.

boolean

false

connect.opensearch.security.jwt.token

Static JWT bearer token. Does NOT auto-refresh. Mutually exclusive with jwt.token.file.

password

connect.opensearch.security.jwt.token.file

Path to a file containing the JWT bearer token. Re-read at the configured refresh interval. Mutually exclusive with jwt.token.

string

connect.opensearch.security.jwt.token.refresh.interval.ms

How often (in ms) to re-read the JWT token file. Must be > 0.

long

60000

connect.opensearch.security.jwt.token.base.dir

Restricts JWT token file reads to this directory and its subdirectories. Strongly recommended in multi-tenant deployments.

string

connect.opensearch.aws.signing.enabled

Enable AWS SigV4 request signing. When true, switches from RestClientTransport to AwsSdk2Transport.

boolean

false

connect.opensearch.aws.region

AWS region. Required when aws.signing.enabled=true.

string

connect.opensearch.aws.signing.service

AWS service name for SigV4. Must be one of {es, aoss}. Use es for Managed OpenSearch, aoss for OpenSearch Serverless.

string

es

connect.opensearch.aws.credentials.provider

Credentials provider: DEFAULT (SDK credential chain, recommended for production) or STATIC (inline keys, dev only).

string

DEFAULT

connect.opensearch.aws.access.key.id

AWS access key ID. Required when aws.credentials.provider=STATIC.

string

connect.opensearch.aws.secret.access.key

AWS secret access key. Required when aws.credentials.provider=STATIC.

password

connect.opensearch.aws.session.token

AWS session token for STS / assumed-role workflows. Does NOT auto-refresh.

password

connect.progress.enabled

Enables the output for how many records have been processed.

boolean

false

KCQL Properties

Name
Description
Type
Default Value

behavior.on.null.values

Specifies behavior on Kafka tombstones: IGNORE, DELETE or FAIL

String

IGNORE

SSL Configuration Properties

Property Name

Description

ssl.truststore.location

Path to the truststore file containing the trusted CA certificates for verifying broker certificates.

ssl.truststore.password

Password for the truststore file to protect its integrity.

ssl.truststore.type

Type of the truststore (e.g., JKS, PKCS12). Default is JKS.

ssl.keystore.location

Path to the keystore file containing the client's private key and certificate chain for client authentication.

ssl.keystore.password

Password for the keystore to protect the private key.

ssl.keystore.type

Type of the keystore (e.g., JKS, PKCS12). Default is JKS.

ssl.protocol

The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.

ssl.trustmanager.algorithm

Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

ssl.keymanager.algorithm

Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

SSL Configuration

Enabling SSL connections between Kafka Connect and OpenSearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with.

Configuration Example

Terminology

  • Truststore: Holds certificates to check if the node's certificate is valid.

  • Keystore: Contains your client's private key and certificate to prove your identity to the node.

  • SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.

  • Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.

Last updated

Was this helpful?