> For the complete documentation index, see [llms.txt](https://docs.lenses.io/latest/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/opensearch.md).

# OpenSearch

{% hint style="info" %}
If you are connecting to Elasticsearch (6 or 7) rather than OpenSearch, see the [Elasticsearch sink](/latest/connectors/kafka-connectors/sinks/elasticsearch.md) page.
{% endhint %}

## Connector Class

```properties
io.lenses.streamreactor.connect.opensearch.OpenSearchSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](/latest/connectors/tutorials.md).
{% endhint %}

```bash
name=opensearch
connector.class=io.lenses.streamreactor.connect.opensearch.OpenSearchSinkConnector
tasks.max=1
topics=orders
connect.opensearch.protocol=https
connect.opensearch.hosts=opensearch-host
connect.opensearch.port=9200
connect.opensearch.use.http.username=admin
connect.opensearch.use.http.password=secret
connect.opensearch.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true
```

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by **`;`** to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT | UPSERT
INTO <opensearch_index>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHINDEXSUFFIX=<your_suffix>]
```

{% hint style="warning" %}
`WITHDOCTYPE` is silently ignored on OpenSearch — mapping types were removed in OpenSearch 1.0.

`connect.opensearch.tableprefix` is not supported and is rejected at startup with a descriptive error. Use the KCQL `INTO` target to control the index name.

`connect.opensearch.cluster.name` is not supported and is rejected at startup — it is an Elasticsearch-only concept.
{% endhint %}

Examples:

```sql
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y

-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
```

#### Kafka Tombstone Handling

It is possible to configure how the Connector handles a null value payload (called *Kafka tombstones*). Please use the `behavior.on.null.values` property in your KCQL with one of the possible values:

* `IGNORE` (ignores tombstones entirely)
* `FAIL` (throws Exception if tombstone happens)
* `DELETE` (deletes index with specified id)

Example:

```sql
INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')
```

## Primary Keys <a href="#primary-keys" id="primary-keys"></a>

The **PK** keyword allows you to specify fields that will be used to generate the key value in OpenSearch. The values of the selected fields are concatenated and separated by a hyphen (`-`).

If no fields are defined, the connector defaults to using the **topic name**, **partition**, and **message offset** to construct the key.

**Field Prefixes**

When defining fields, specific prefixes can be used to determine where the data should be extracted from:

* **`_key` Prefix**\
  Specifies that the value should be extracted from the **message key**.
  * If a path is provided after `_key`, it identifies the location within the key where the field value resides.
  * If no path is provided, the entire message key is used as the value.
* **`_value` Prefix**\
  Specifies that the value should be extracted from the **message value**.
  * The remainder of the path identifies the specific location within the message value to extract the field.
* **`_header` Prefix**\
  Specifies that the value should be extracted from the **message header**.
  * The remainder of the path indicates the name of the header to be used for the field value.

## Insert and Upsert modes

**INSERT** writes new records to OpenSearch, replacing existing records with the same ID set by the **PK (Primary Key)** keyword. **UPSERT** replaces existing records if a matching record is found, or inserts a new one if none is found.

## Index Suffix <a href="#index-suffix" id="index-suffix"></a>

`WITHINDEXSUFFIX` allows you to specify a suffix to your index and supports date format.

Example:

```bash
WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
```

***

## Index Names

### Static Index Names

To use a static index name, define the target index in the KCQL statement without any prefixes:

```sql
INSERT INTO index_name SELECT * FROM topicA
```

This will consistently create an index named `index_name` for any messages consumed from `topicA`.

### Extracting Index Names from Headers, Keys, and Values

#### Headers

To extract an index name from a message header, use the `_header` prefix followed by the header name:

```sql
INSERT INTO _header.gate SELECT * FROM topicA
```

This statement extracts the value from the `gate` header field and uses it as the index name.

For headers with names that include dots, enclose the entire target in backticks (\`\`\`) and each segment which consists of a field name in single quotes (`'`):

```sql
INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA
```

In this case, the value of the header named `prefix.abc.suffix` is used to form the index name.

#### Keys

To use the full value of the message key as the index name, use the `_key` prefix:

```sql
INSERT INTO _key SELECT * FROM topicA
```

For example, if the message key is `"freddie"`, the resulting index name will be `freddie`.

#### Values

To extract an index name from a field within the message value, use the `_value` prefix followed by the field name:

```sql
INSERT INTO _value.name SELECT * FROM topicA
```

This example uses the value of the `name` field from the message's value. If the field contains `"jason"`, the index name will be `jason`.

**Nested Fields in Values**

To access nested fields within a value, specify the full path using dot notation:

```sql
INSERT INTO _value.name.firstName SELECT * FROM topicA
```

If the `firstName` field is nested within the `name` structure, its value (e.g., `"hans"`) will be used as the index name.

**Fields with Dots in Their Names**

For field names that include dots, enclose the entire target in backticks (\`\`\`) and each segment which consists of a field name in single quotes (`'`):

```sql
INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
```

If the value structure contains:

```json
{
  "customer.name": {
    "first.name": "hans"
  }
}
```

The extracted index name will be `hans`.

## Auto Index Creation <a href="#auto-index-creation" id="auto-index-creation"></a>

The Sink will automatically create missing indexes at startup.

Please note that this feature is not compatible with index names extracted from message headers/keys/values.

## Strict Bulk Item Errors <a href="#strict-bulk-item-errors" id="strict-bulk-item-errors"></a>

By default the OpenSearch connector routes any per-item bulk failure through the configured `ErrorPolicy`. This is stricter than the legacy Elasticsearch 7 behaviour (which silently dropped item-level errors) and surfaces mapping conflicts and version conflicts as visible failures.

Set `connect.opensearch.bulk.strict.item.errors=false` to restore ES7-compatible tolerant behaviour where only HTTP-transport errors are surfaced.

{% hint style="warning" %}
Setting `bulk.strict.item.errors=false` neutralises every other retry knob for per-item failures. Use with care.
{% endhint %}

## AWS SigV4 Authentication <a href="#aws-sigv4" id="aws-sigv4"></a>

For AWS-managed OpenSearch Service and OpenSearch Serverless, the connector can sign requests with AWS SigV4 instead of (or in addition to) HTTP Basic auth.

Enable SigV4 signing with:

```properties
connect.opensearch.aws.signing.enabled=true
connect.opensearch.aws.region=us-east-1
connect.opensearch.aws.signing.service=es
```

Use `aws.signing.service=aoss` for OpenSearch Serverless.

**Credentials provider**

| Value     | Description                                                                                                                                                                                         |
| --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `DEFAULT` | (Recommended) Uses the standard AWS SDK credential chain — IAM role on EC2/EKS, environment variables, `~/.aws/credentials`. Credentials are refreshed automatically.                               |
| `STATIC`  | Reads `aws.access.key.id` + `aws.secret.access.key` (+ optional `aws.session.token`) from the connector config. Keys are **not** rotated. Suitable for development or air-gapped environments only. |

```properties
connect.opensearch.aws.credentials.provider=STATIC
connect.opensearch.aws.access.key.id=AKIAIOSFODNN7EXAMPLE
connect.opensearch.aws.secret.access.key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
```

{% hint style="warning" %}
A static session token configured via `connect.opensearch.aws.session.token` does **not** auto-refresh. The connector will fail when the token expires. Use `credentials.provider=DEFAULT` for production deployments.
{% endhint %}

## JWT Authentication <a href="#jwt-auth" id="jwt-auth"></a>

For clusters secured with bearer-token authentication, configure one of the two mutually-exclusive modes:

**Static token** (development / short-lived environments):

```properties
connect.opensearch.security.jwt.token=eyJhbGci...
```

The token is read once at startup. It does **not** auto-refresh. The task will fail with HTTP 401 when the token expires.

**File-based token** (recommended for production):

```properties
connect.opensearch.security.jwt.token.file=/run/secrets/opensearch-jwt
connect.opensearch.security.jwt.token.refresh.interval.ms=60000
```

The connector re-reads the file at the configured interval. Rotate the file contents with your IdP tooling and tune the interval to match the IdP's rotation cadence.

**Restricting file access** (recommended in multi-tenant Connect deployments):

```properties
connect.opensearch.security.jwt.token.base.dir=/run/secrets
```

When set, any path that resolves outside the base directory is rejected at read time, preventing a connector config from being used to read arbitrary worker-local files.

## Connection Pool Tuning <a href="#connection-pool" id="connection-pool"></a>

The REST/HC5 transport path exposes two connection-pool knobs. These settings are **ignored** on the SigV4 path (the AWS SDK manages its own connection pool).

| Property                                       | Default | Description                                                                                                |
| ---------------------------------------------- | ------- | ---------------------------------------------------------------------------------------------------------- |
| `connect.opensearch.max.connections.per.route` | 5       | Maximum HTTP connections per OpenSearch node. Increase for high-throughput deployments.                    |
| `connect.opensearch.max.connections.total`     | 25      | Maximum total connections across all nodes. Set to at least `max.connections.per.route × number of nodes`. |

## Cleartext Auth Guard <a href="#cleartext-auth-guard" id="cleartext-auth-guard"></a>

By default the connector rejects `protocol=http` when HTTP Basic auth or JWT bearer authentication is configured, because credentials would be transmitted in cleartext and could be intercepted by an on-path attacker.

Set `connect.opensearch.security.allow.insecure.auth=true` **only** when TLS termination is handled by a trusted local sidecar or service mesh. A `WARN`-level message is emitted at startup when this override is active.

## Options Reference <a href="#options" id="options"></a>

<table data-full-width="true"><thead><tr><th width="400">Name</th><th width="440">Description</th><th width="90">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.opensearch.protocol</td><td>URL protocol (http, https)</td><td>string</td><td>https</td></tr><tr><td>connect.opensearch.hosts</td><td>List of hostnames for the OpenSearch cluster nodes, not including protocol or port.</td><td>string</td><td>localhost</td></tr><tr><td>connect.opensearch.port</td><td>Port on which the OpenSearch node listens.</td><td>int</td><td>9200</td></tr><tr><td>connect.opensearch.kcql</td><td>KCQL expression describing field selection and routes.</td><td>string</td><td></td></tr><tr><td>connect.opensearch.write.timeout</td><td>The time to wait in <strong>milliseconds</strong> for a bulk request to complete. Default 300000 = 5 minutes. Note: unlike Elasticsearch 6/7, OpenSearch interprets this value in milliseconds.</td><td>int</td><td>300000</td></tr><tr><td>connect.opensearch.batch.size</td><td>How many records to process at one time.</td><td>int</td><td>4000</td></tr><tr><td>connect.opensearch.use.http.username</td><td>Username for HTTP Basic Auth. Leave empty to disable Basic Auth.</td><td>string</td><td></td></tr><tr><td>connect.opensearch.use.http.password</td><td>Password for HTTP Basic Auth. Leave empty to disable Basic Auth.</td><td>password</td><td></td></tr><tr><td>connect.opensearch.error.policy</td><td>Action on error: NOOP, THROW, or RETRY.</td><td>string</td><td>THROW</td></tr><tr><td>connect.opensearch.max.retries</td><td>Maximum number of retry attempts on write failure.</td><td>int</td><td>20</td></tr><tr><td>connect.opensearch.retry.interval</td><td>Time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.opensearch.pk.separator</td><td>Separator used when more than one field is in PK.</td><td>string</td><td>-</td></tr><tr><td>connect.opensearch.bulk.strict.item.errors</td><td>When true (default), per-item bulk failures go through ErrorPolicy. When false, only HTTP-transport errors are surfaced, matching ES7 tolerant behaviour. WARNING: setting false neutralises per-item retry knobs.</td><td>boolean</td><td>true</td></tr><tr><td>connect.opensearch.max.connections.per.route</td><td>Maximum HTTP connections per OpenSearch node (HC5 REST transport only; ignored on SigV4 path).</td><td>int</td><td>5</td></tr><tr><td>connect.opensearch.max.connections.total</td><td>Maximum total HTTP connections across all nodes (HC5 REST transport only).</td><td>int</td><td>25</td></tr><tr><td>connect.opensearch.security.allow.insecure.auth</td><td>Allow HTTP Basic / JWT auth over protocol=http. Set true only when TLS is terminated by a trusted local sidecar. A WARN is emitted at startup.</td><td>boolean</td><td>false</td></tr><tr><td>connect.opensearch.security.jwt.token</td><td>Static JWT bearer token. Does NOT auto-refresh. Mutually exclusive with jwt.token.file.</td><td>password</td><td></td></tr><tr><td>connect.opensearch.security.jwt.token.file</td><td>Path to a file containing the JWT bearer token. Re-read at the configured refresh interval. Mutually exclusive with jwt.token.</td><td>string</td><td></td></tr><tr><td>connect.opensearch.security.jwt.token.refresh.interval.ms</td><td>How often (in ms) to re-read the JWT token file. Must be > 0.</td><td>long</td><td>60000</td></tr><tr><td>connect.opensearch.security.jwt.token.base.dir</td><td>Restricts JWT token file reads to this directory and its subdirectories. Strongly recommended in multi-tenant deployments.</td><td>string</td><td></td></tr><tr><td>connect.opensearch.aws.signing.enabled</td><td>Enable AWS SigV4 request signing. When true, switches from RestClientTransport to AwsSdk2Transport.</td><td>boolean</td><td>false</td></tr><tr><td>connect.opensearch.aws.region</td><td>AWS region. Required when aws.signing.enabled=true.</td><td>string</td><td></td></tr><tr><td>connect.opensearch.aws.signing.service</td><td>AWS service name for SigV4. Must be one of {es, aoss}. Use es for Managed OpenSearch, aoss for OpenSearch Serverless.</td><td>string</td><td>es</td></tr><tr><td>connect.opensearch.aws.credentials.provider</td><td>Credentials provider: DEFAULT (SDK credential chain, recommended for production) or STATIC (inline keys, dev only).</td><td>string</td><td>DEFAULT</td></tr><tr><td>connect.opensearch.aws.access.key.id</td><td>AWS access key ID. Required when aws.credentials.provider=STATIC.</td><td>string</td><td></td></tr><tr><td>connect.opensearch.aws.secret.access.key</td><td>AWS secret access key. Required when aws.credentials.provider=STATIC.</td><td>password</td><td></td></tr><tr><td>connect.opensearch.aws.session.token</td><td>AWS session token for STS / assumed-role workflows. Does NOT auto-refresh.</td><td>password</td><td></td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed.</td><td>boolean</td><td>false</td></tr></tbody></table>

### KCQL Properties

<table data-full-width="true"><thead><tr><th width="167">Name</th><th width="463">Description</th><th width="136">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>behavior.on.null.values</td><td>Specifies behavior on Kafka tombstones: <code>IGNORE</code>, <code>DELETE</code> or <code>FAIL</code></td><td>String</td><td>IGNORE</td></tr></tbody></table>

### SSL Configuration Properties

| **Property Name**            | **Description**                                                                                                                                        |
| ---------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `ssl.truststore.location`    | Path to the truststore file containing the trusted CA certificates for verifying broker certificates.                                                  |
| `ssl.truststore.password`    | Password for the truststore file to protect its integrity.                                                                                             |
| `ssl.truststore.type`        | Type of the truststore (e.g., `JKS`, `PKCS12`). Default is `JKS`.                                                                                      |
| `ssl.keystore.location`      | Path to the keystore file containing the client's private key and certificate chain for client authentication.                                         |
| `ssl.keystore.password`      | Password for the keystore to protect the private key.                                                                                                  |
| `ssl.keystore.type`          | Type of the keystore (e.g., `JKS`, `PKCS12`). Default is `JKS`.                                                                                        |
| `ssl.protocol`               | The SSL protocol used for secure connections (e.g., `TLSv1.2`, `TLSv1.3`). Default is `TLS`.                                                           |
| `ssl.trustmanager.algorithm` | Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
| `ssl.keymanager.algorithm`   | Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.   |

## SSL Configuration

Enabling SSL connections between Kafka Connect and OpenSearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with.

### Configuration Example

```properties
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS  # Can also be PKCS12 if applicable

ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS  # Can also be PKCS12 if applicable

ssl.protocol=TLSv1.2  # Or TLSv1.3 for stronger security

ssl.trustmanager.algorithm=PKIX  # Default algorithm for managing certificates
ssl.keymanager.algorithm=PKIX  # Default algorithm for managing certificates
```

### Terminology

* **Truststore**: Holds certificates to check if the node's certificate is valid.
* **Keystore**: Contains your client's private key and certificate to prove your identity to the node.
* **SSL Protocol**: Use TLSv1.2 or TLSv1.3 for up-to-date security.
* **Password Security**: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/opensearch.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
