# Elasticsearch

## Connector Class

### **Elasticsearch 6**

```properties
io.lenses.streamreactor.connect.elastic6.ElasticSinkConnector
```

### **Elasticsearch 7**

```properties
io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

```bash
name=elastic
connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true
```

## KCQL support <a href="#kcql-support" id="kcql-support"></a>

{% hint style="success" %}
You can specify multiple KCQL statements separated by **`;`** to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

The following KCQL is supported:

```sql
INSERT | UPSERT
INTO <elastic_index >
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHDOCTYPE=<your_document_type>]
[WITHINDEXSUFFIX=<your_suffix>]
```

Examples:

```sql
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y

-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
```

#### Kafka Tombstone Handling

It is possible to configure how the Connector handles a null value payload (called *Kafka tombstones*). Please use the `behavior.on.null.values` property in your KCQL with one of the possible values:

* `IGNORE` (ignores tombstones entirely)
* `FAIL` (throws Exception if tombstone happens)
* `DELETE` (deletes index with specified id)

Example:

```sql
INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')
```

## Primary Keys <a href="#primary-keys" id="primary-keys"></a>

The **PK** keyword allows you to specify fields that will be used to generate the key value in Elasticsearch. The values of the selected fields are concatenated and separated by a hyphen (`-`).

If no fields are defined, the connector defaults to using the **topic name**, **partition**, and **message offset** to construct the key.

**Field Prefixes**

When defining fields, specific prefixes can be used to determine where the data should be extracted from:

* **`_key` Prefix**\
  Specifies that the value should be extracted from the **message key**.
  * If a path is provided after `_key`, it identifies the location within the key where the field value resides.
  * If no path is provided, the entire message key is used as the value.
* **`_value` Prefix**\
  Specifies that the value should be extracted from the **message value**.
  * The remainder of the path identifies the specific location within the message value to extract the field.
* **`_header` Prefix**\
  Specifies that the value should be extracted from the **message header**.
  * The remainder of the path indicates the name of the header to be used for the field value.

## Insert and Upsert modes

**INSERT** writes new records to Elastic, replacing existing records with the same ID set by the **PK (Primary Key)** keyword. **UPSERT** replaces existing records if a matching record is found, nor insert a new one if none is found.

## Document Type <a href="#document-type" id="document-type"></a>

`WITHDOCTYPE` allows you to associate a document type to the document inserted.

## Index Suffix <a href="#index-suffix" id="index-suffix"></a>

WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.

Example:

```bash
WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
```

***

## Index Names

### Static Index Names

To use a static index name, define the target index in the KCQL statement without any prefixes:

```sql
INSERT INTO index_name SELECT * FROM topicA
```

This will consistently create an index named `index_name` for any messages consumed from `topicA`.

### Extracting Index Names from Headers, Keys, and Values

#### Headers

To extract an index name from a message header, use the `_header` prefix followed by the header name:

```sql
INSERT INTO _header.gate SELECT * FROM topicA
```

This statement extracts the value from the `gate` header field and uses it as the index name.

For headers with names that include dots, enclose the entire target in backticks (\`\`\`) and each segment which consists of a field name in single quotes (`'`):

```sql
INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA
```

In this case, the value of the header named `prefix.abc.suffix` is used to form the index name.

#### Keys

To use the full value of the message key as the index name, use the `_key` prefix:

```sql
INSERT INTO _key SELECT * FROM topicA
```

For example, if the message key is `"freddie"`, the resulting index name will be `freddie`.

#### Values

To extract an index name from a field within the message value, use the `_value` prefix followed by the field name:

```sql
INSERT INTO _value.name SELECT * FROM topicA
```

This example uses the value of the `name` field from the message's value. If the field contains `"jason"`, the index name will be `jason`.

**Nested Fields in Values**

To access nested fields within a value, specify the full path using dot notation:

```sql
INSERT INTO _value.name.firstName SELECT * FROM topicA
```

If the `firstName` field is nested within the `name` structure, its value (e.g., `"hans"`) will be used as the index name.

**Fields with Dots in Their Names**

For field names that include dots, enclose the entire target in backticks (\`\`\`) and each segment which consists of a field name in single quotes (`'`):

```sql
INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
```

If the value structure contains:

```json
{
  "customer.name": {
    "first.name": "hans"
  }
}
```

The extracted index name will be `hans`.

## Auto Index Creation <a href="#auto-index-creation" id="auto-index-creation"></a>

The Sink will automatically create missing indexes at startup.

Please note that this feature is not compatible with index names extracted from message headers/keys/values.

## Options Reference <a href="#options" id="options"></a>

<table data-full-width="true"><thead><tr><th width="343">Name</th><th width="438">Description</th><th width="109">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>connect.elastic.protocol</td><td>URL protocol (http, https)</td><td>string</td><td>http</td></tr><tr><td>connect.elastic.hosts</td><td>List of hostnames for Elastic Search cluster node, not including protocol or port.</td><td>string</td><td>localhost</td></tr><tr><td>connect.elastic.port</td><td>Port on which Elastic Search node listens on</td><td>string</td><td>9300</td></tr><tr><td>connect.elastic.tableprefix</td><td>Table prefix (optional)</td><td>string</td><td></td></tr><tr><td>connect.elastic.cluster.name</td><td>Name of the elastic search cluster, used in local mode for setting the connection</td><td>string</td><td>elasticsearch</td></tr><tr><td>connect.elastic.write.timeout</td><td>The time to wait in millis. Default is 5 minutes.</td><td>int</td><td>300000</td></tr><tr><td>connect.elastic.batch.size</td><td>How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once</td><td>int</td><td>4000</td></tr><tr><td>connect.elastic.use.http.username</td><td>Username if HTTP Basic Auth required default is null.</td><td>string</td><td></td></tr><tr><td>connect.elastic.use.http.password</td><td>Password if HTTP Basic Auth required default is null.</td><td>string</td><td></td></tr><tr><td>connect.elastic.error.policy</td><td>Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically</td><td>string</td><td>THROW</td></tr><tr><td>connect.elastic.max.retries</td><td>The maximum number of times to try the write again.</td><td>int</td><td>20</td></tr><tr><td>connect.elastic.retry.interval</td><td>The time in milliseconds between retries.</td><td>int</td><td>60000</td></tr><tr><td>connect.elastic.kcql</td><td>KCQL expression describing field selection and routes.</td><td>string</td><td></td></tr><tr><td>connect.elastic.pk.separator</td><td>Separator used when have more that one field in PK</td><td>string</td><td>-</td></tr><tr><td>connect.progress.enabled</td><td>Enables the output for how many records have been processed</td><td>boolean</td><td>false</td></tr></tbody></table>

### KCQL Properties

<table data-full-width="true"><thead><tr><th width="167">Name</th><th width="463">Description</th><th width="136">Type</th><th>Default Value</th></tr></thead><tbody><tr><td>behavior.on.null.values</td><td>Specifies behavior on Kafka tombstones: <code>IGNORE</code> , <code>DELETE</code> or <code>FAIL</code></td><td>String</td><td>IGNORE</td></tr></tbody></table>

### SSL Configuration Properties

| **Property Name**            | **Description**                                                                                                                                        |
| ---------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `ssl.truststore.location`    | Path to the truststore file containing the trusted CA certificates for verifying broker certificates.                                                  |
| `ssl.truststore.password`    | Password for the truststore file to protect its integrity.                                                                                             |
| `ssl.truststore.type`        | Type of the truststore (e.g., `JKS`, `PKCS12`). Default is `JKS`.                                                                                      |
| `ssl.keystore.location`      | Path to the keystore file containing the client’s private key and certificate chain for client authentication.                                         |
| `ssl.keystore.password`      | Password for the keystore to protect the private key.                                                                                                  |
| `ssl.keystore.type`          | Type of the keystore (e.g., `JKS`, `PKCS12`). Default is `JKS`.                                                                                        |
| `ssl.protocol`               | The SSL protocol used for secure connections (e.g., `TLSv1.2`, `TLSv1.3`). Default is `TLS`.                                                           |
| `ssl.trustmanager.algorithm` | Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
| `ssl.keymanager.algorithm`   | Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.   |

## SSL Configuration

Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.

While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:

* **Data encryption**: Prevents unauthorized access to data being transferred.
* **Authentication**: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.
* **Compliance**: Meets security standards for regulatory requirements (such as GDPR or HIPAA).

### Configuration Example

```properties
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS  # Can also be PKCS12 if applicable

ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS  # Can also be PKCS12 if applicable

ssl.protocol=TLSv1.2  # Or TLSv1.3 for stronger security

ssl.trustmanager.algorithm=PKIX  # Default algorithm for managing certificates
ssl.keymanager.algorithm=PKIX  # Default algorithm for managing certificates
```

### Terminology:

* **Truststore**: Holds certificates to check if the node’s certificate is valid.
* **Keystore**: Contains your client’s private key and certificate to prove your identity to the node.
* **SSL Protocol**: Use TLSv1.2 or TLSv1.3 for up-to-date security.
* **Password Security**: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.
