OpenSearch
This page describes the usage of the Stream Reactor OpenSearch Sink Connector.
If you are connecting to Elasticsearch (6 or 7) rather than OpenSearch, see the Elasticsearch sink page.
Connector Class
io.lenses.streamreactor.connect.opensearch.OpenSearchSinkConnectorExample
For more examples see the tutorials.
name=opensearch
connector.class=io.lenses.streamreactor.connect.opensearch.OpenSearchSinkConnector
tasks.max=1
topics=orders
connect.opensearch.protocol=https
connect.opensearch.hosts=opensearch-host
connect.opensearch.port=9200
connect.opensearch.use.http.username=admin
connect.opensearch.use.http.password=secret
connect.opensearch.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=trueKCQL support
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
WITHDOCTYPE is silently ignored on OpenSearch — mapping types were removed in OpenSearch 1.0.
connect.opensearch.tableprefix is not supported and is rejected at startup with a descriptive error. Use the KCQL INTO target to control the index name.
connect.opensearch.cluster.name is not supported and is rejected at startup — it is an Elasticsearch-only concept.
Examples:
Kafka Tombstone Handling
It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:
IGNORE(ignores tombstones entirely)FAIL(throws Exception if tombstone happens)DELETE(deletes index with specified id)
Example:
Primary Keys
The PK keyword allows you to specify fields that will be used to generate the key value in OpenSearch. The values of the selected fields are concatenated and separated by a hyphen (-).
If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.
Field Prefixes
When defining fields, specific prefixes can be used to determine where the data should be extracted from:
_keyPrefix Specifies that the value should be extracted from the message key.If a path is provided after
_key, it identifies the location within the key where the field value resides.If no path is provided, the entire message key is used as the value.
_valuePrefix Specifies that the value should be extracted from the message value.The remainder of the path identifies the specific location within the message value to extract the field.
_headerPrefix Specifies that the value should be extracted from the message header.The remainder of the path indicates the name of the header to be used for the field value.
Insert and Upsert modes
INSERT writes new records to OpenSearch, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, or inserts a new one if none is found.
Index Suffix
WITHINDEXSUFFIX allows you to specify a suffix to your index and supports date format.
Example:
Index Names
Static Index Names
To use a static index name, define the target index in the KCQL statement without any prefixes:
This will consistently create an index named index_name for any messages consumed from topicA.
Extracting Index Names from Headers, Keys, and Values
Headers
To extract an index name from a message header, use the _header prefix followed by the header name:
This statement extracts the value from the gate header field and uses it as the index name.
For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):
In this case, the value of the header named prefix.abc.suffix is used to form the index name.
Keys
To use the full value of the message key as the index name, use the _key prefix:
For example, if the message key is "freddie", the resulting index name will be freddie.
Values
To extract an index name from a field within the message value, use the _value prefix followed by the field name:
This example uses the value of the name field from the message's value. If the field contains "jason", the index name will be jason.
Nested Fields in Values
To access nested fields within a value, specify the full path using dot notation:
If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.
Fields with Dots in Their Names
For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):
If the value structure contains:
The extracted index name will be hans.
Auto Index Creation
The Sink will automatically create missing indexes at startup.
Please note that this feature is not compatible with index names extracted from message headers/keys/values.
Strict Bulk Item Errors
By default the OpenSearch connector routes any per-item bulk failure through the configured ErrorPolicy. This is stricter than the legacy Elasticsearch 7 behaviour (which silently dropped item-level errors) and surfaces mapping conflicts and version conflicts as visible failures.
Set connect.opensearch.bulk.strict.item.errors=false to restore ES7-compatible tolerant behaviour where only HTTP-transport errors are surfaced.
Setting bulk.strict.item.errors=false neutralises every other retry knob for per-item failures. Use with care.
AWS SigV4 Authentication
For AWS-managed OpenSearch Service and OpenSearch Serverless, the connector can sign requests with AWS SigV4 instead of (or in addition to) HTTP Basic auth.
Enable SigV4 signing with:
Use aws.signing.service=aoss for OpenSearch Serverless.
Credentials provider
DEFAULT
(Recommended) Uses the standard AWS SDK credential chain — IAM role on EC2/EKS, environment variables, ~/.aws/credentials. Credentials are refreshed automatically.
STATIC
Reads aws.access.key.id + aws.secret.access.key (+ optional aws.session.token) from the connector config. Keys are not rotated. Suitable for development or air-gapped environments only.
A static session token configured via connect.opensearch.aws.session.token does not auto-refresh. The connector will fail when the token expires. Use credentials.provider=DEFAULT for production deployments.
JWT Authentication
For clusters secured with bearer-token authentication, configure one of the two mutually-exclusive modes:
Static token (development / short-lived environments):
The token is read once at startup. It does not auto-refresh. The task will fail with HTTP 401 when the token expires.
File-based token (recommended for production):
The connector re-reads the file at the configured interval. Rotate the file contents with your IdP tooling and tune the interval to match the IdP's rotation cadence.
Restricting file access (recommended in multi-tenant Connect deployments):
When set, any path that resolves outside the base directory is rejected at read time, preventing a connector config from being used to read arbitrary worker-local files.
Connection Pool Tuning
The REST/HC5 transport path exposes two connection-pool knobs. These settings are ignored on the SigV4 path (the AWS SDK manages its own connection pool).
connect.opensearch.max.connections.per.route
5
Maximum HTTP connections per OpenSearch node. Increase for high-throughput deployments.
connect.opensearch.max.connections.total
25
Maximum total connections across all nodes. Set to at least max.connections.per.route × number of nodes.
Cleartext Auth Guard
By default the connector rejects protocol=http when HTTP Basic auth or JWT bearer authentication is configured, because credentials would be transmitted in cleartext and could be intercepted by an on-path attacker.
Set connect.opensearch.security.allow.insecure.auth=true only when TLS termination is handled by a trusted local sidecar or service mesh. A WARN-level message is emitted at startup when this override is active.
Options Reference
connect.opensearch.protocol
URL protocol (http, https)
string
https
connect.opensearch.hosts
List of hostnames for the OpenSearch cluster nodes, not including protocol or port.
string
localhost
connect.opensearch.port
Port on which the OpenSearch node listens.
int
9200
connect.opensearch.kcql
KCQL expression describing field selection and routes.
string
connect.opensearch.write.timeout
The time to wait in milliseconds for a bulk request to complete. Default 300000 = 5 minutes. Note: unlike Elasticsearch 6/7, OpenSearch interprets this value in milliseconds.
int
300000
connect.opensearch.batch.size
How many records to process at one time.
int
4000
connect.opensearch.use.http.username
Username for HTTP Basic Auth. Leave empty to disable Basic Auth.
string
connect.opensearch.use.http.password
Password for HTTP Basic Auth. Leave empty to disable Basic Auth.
password
connect.opensearch.error.policy
Action on error: NOOP, THROW, or RETRY.
string
THROW
connect.opensearch.max.retries
Maximum number of retry attempts on write failure.
int
20
connect.opensearch.retry.interval
Time in milliseconds between retries.
int
60000
connect.opensearch.pk.separator
Separator used when more than one field is in PK.
string
-
connect.opensearch.bulk.strict.item.errors
When true (default), per-item bulk failures go through ErrorPolicy. When false, only HTTP-transport errors are surfaced, matching ES7 tolerant behaviour. WARNING: setting false neutralises per-item retry knobs.
boolean
true
connect.opensearch.max.connections.per.route
Maximum HTTP connections per OpenSearch node (HC5 REST transport only; ignored on SigV4 path).
int
5
connect.opensearch.max.connections.total
Maximum total HTTP connections across all nodes (HC5 REST transport only).
int
25
connect.opensearch.security.allow.insecure.auth
Allow HTTP Basic / JWT auth over protocol=http. Set true only when TLS is terminated by a trusted local sidecar. A WARN is emitted at startup.
boolean
false
connect.opensearch.security.jwt.token
Static JWT bearer token. Does NOT auto-refresh. Mutually exclusive with jwt.token.file.
password
connect.opensearch.security.jwt.token.file
Path to a file containing the JWT bearer token. Re-read at the configured refresh interval. Mutually exclusive with jwt.token.
string
connect.opensearch.security.jwt.token.refresh.interval.ms
How often (in ms) to re-read the JWT token file. Must be > 0.
long
60000
connect.opensearch.security.jwt.token.base.dir
Restricts JWT token file reads to this directory and its subdirectories. Strongly recommended in multi-tenant deployments.
string
connect.opensearch.aws.signing.enabled
Enable AWS SigV4 request signing. When true, switches from RestClientTransport to AwsSdk2Transport.
boolean
false
connect.opensearch.aws.region
AWS region. Required when aws.signing.enabled=true.
string
connect.opensearch.aws.signing.service
AWS service name for SigV4. Must be one of {es, aoss}. Use es for Managed OpenSearch, aoss for OpenSearch Serverless.
string
es
connect.opensearch.aws.credentials.provider
Credentials provider: DEFAULT (SDK credential chain, recommended for production) or STATIC (inline keys, dev only).
string
DEFAULT
connect.opensearch.aws.access.key.id
AWS access key ID. Required when aws.credentials.provider=STATIC.
string
connect.opensearch.aws.secret.access.key
AWS secret access key. Required when aws.credentials.provider=STATIC.
password
connect.opensearch.aws.session.token
AWS session token for STS / assumed-role workflows. Does NOT auto-refresh.
password
connect.progress.enabled
Enables the output for how many records have been processed.
boolean
false
KCQL Properties
behavior.on.null.values
Specifies behavior on Kafka tombstones: IGNORE, DELETE or FAIL
String
IGNORE
SSL Configuration Properties
Property Name
Description
ssl.truststore.location
Path to the truststore file containing the trusted CA certificates for verifying broker certificates.
ssl.truststore.password
Password for the truststore file to protect its integrity.
ssl.truststore.type
Type of the truststore (e.g., JKS, PKCS12). Default is JKS.
ssl.keystore.location
Path to the keystore file containing the client's private key and certificate chain for client authentication.
ssl.keystore.password
Password for the keystore to protect the private key.
ssl.keystore.type
Type of the keystore (e.g., JKS, PKCS12). Default is JKS.
ssl.protocol
The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.
ssl.trustmanager.algorithm
Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
ssl.keymanager.algorithm
Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
SSL Configuration
Enabling SSL connections between Kafka Connect and OpenSearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with.
Configuration Example
Terminology
Truststore: Holds certificates to check if the node's certificate is valid.
Keystore: Contains your client's private key and certificate to prove your identity to the node.
SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.
Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.
Last updated
Was this helpful?

