# HTTP

A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.

## Features

* Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).
* URL, header and content templating ability give you full control of the HTTP request.
* Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.

## Connector Class <a href="#features" id="features"></a>

```
io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
```

## Example <a href="#features" id="features"></a>

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="true" %}

```properties
name=lenseshttp
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
tasks.max=1
topics=topicToRead
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.authentication.type=none
connect.http.method=POST
connect.http.endpoint=http://endpoint.local/receive
connect.http.request.content="My Static Content Template"
connect.http.batch.count=1
```

{% endcode %}

## Content Template

The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.

### Static Templating

If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:

```properties
connect.http.request.content="My Static Content Template"
```

### Single Message Templating

When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.

In your configuration, in the content property of your config, you can define template substitutions like the following example:

(please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)

```properties
connect.http.request.content="<product><id>{{value.name}}</id></product>"
```

### Multiple Message Templating

To collapse multiple messages into a single HTTP request, you can use the multiple messaging template. This is automatic if the template has a `messages` tag. See the below example:

```handlebars
  <messages>
    {{#message}}
        <message>
          <topic>{{topic}}</topic>
          <employee>{{value.employeeId}}</employee>
          <order>{{value.orderNo}}</order>
          <groupDomain>{{value.groupDomain}}</groupDomain>
        </message>
    {{/message}}
  </messages>
```

Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.

Your connector configuration will look like this:

```properties
connect.http.request.content="<messages>{{#message}}<message><topic>{{topic}}</topic><employee>{{value.employeeId}}</employee><order>{{value.orderNo}}</order><groupDomain>{{value.groupDomain}}</groupDomain></message>{{/message}}</messages>"
```

The final result will be HTTP requests with bodies like this:

```xml
  <messages>
    <message>
      <topic>myTopic</topic>
       <employee>Abcd1234</employee>
       <order>10</order>
       <groupDomain>myExampleGroup.uk</groupDomain>
    </message>
    <message>
       <topic>myTopic</topic>
       <employee>Efgh5678</employee>
       <order>11</order>
       <groupDomain>myExampleGroup.uk</groupDomain>
    </message>
  </messages>
```

### Available Keys

When using simple and multiple message templating, the following are available:

| Field     | Usage Example             |
| --------- | ------------------------- |
| Header    | {{header.correlation-id}} |
| Value     | {{value}}                 |
|           | {{value.product.id}}      |
| Key       | {{key}}                   |
|           | {{key.customer.number}}   |
| Topic     | {{topic}}                 |
| Partition | {{partition}}             |
| Offset    | {{offset}}                |
| Timestamp | {{timestamp}}             |

## URL Templating

URL including protocol (eg. `http://lenses.io`). Template variables can be used.

The URL is also a [Content Template](#content-template) so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.

## Authentication Options

Currently, the HTTP Sink supports either no authentication, BASIC HTTP authentication and OAuth2 authentication.

### No Authentication (Default)

By default, no authentication is set. This can be also done by providing a configuration like this:

```properties
connect.http.authentication.type=none
```

### BASIC HTTP Authentication

BASIC auth can be configured by providing a configuration like this:

```properties
connect.http.authentication.type=basic
connect.http.authentication.basic.username=user
connect.http.authentication.basic.password=password
```

### OAuth2 Authentication

OAuth auth can be configured by providing a configuration like this:

```properties
connect.http.authentication.type=oauth2
connect.http.authentication.oauth2.token.url=http://myoauth2.local/getToken
connect.http.authentication.oauth2.client.id=clientId
connect.http.authentication.oauth2.client.secret=client-secret
connect.http.authentication.oauth2.token.property=access_token
connect.http.authentication.oauth2.client.scope=any
connect.http.authentication.oauth2.client.headers=header:value
```

## Headers List

To customise the headers sent with your HTTP request you can supply a Headers List.

Each header key and value is also a [Content Template](#content-template) so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.

Example:

```properties
connect.http.request.headers="Content-Type","text/plain","X-User","{{header.kafkauser}}","Product","{{value.product.id}}"
```

## SSL Configuration

Enabling SSL connections between Kafka Connect and HTTP Endpoint ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity. Please check out [SSL Configuration Properties](#ssl-configuration-properties) section in order to set it up.

## Batch Configuration

The connector offers three distinct flush options for data management:

* Flush by Count - triggers a file flush after a specified number of records have been written to it.
* Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
* Flush by Interval - enforces a file flush after a defined time interval (in seconds).

It's worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven't reached their thresholds.

Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

The flush options are configured using the `batchCount`, `batchSize` and \`timeInterval properties. The settings are optional and if not specified the defaults are:

| Field        | Default                 |
| ------------ | ----------------------- |
| batchCount   | 50\_000 records         |
| batchSize    | 500000000 (500MB)       |
| timeInterval | 3\_600 seconds (1 hour) |

```properties
connect.http.batch.count=50000
connect.http.batch.size=500000000
connect.http.time.interval=3600
```

## Configuration Examples

Some configuration examples follow on how to apply this connector to different message types.

These include converters, which are required to instruct Kafka Connect on how to read the source content.

### Static string template

In this case the converters are irrelevant as we are not using the message content to populate our message template.

```properties
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="My Static Content Template"
connect.http.batch.count=1
```

### Dynamic string template

The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.

```properties
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="{{value}}"
connect.http.batch.count=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

### Dynamic string template containing json message fields

Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.

```properties
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="product: {{value.product}}"
connect.http.batch.size=1
value.converter.schemas.enable=false
```

### Dynamic string template containing whole json message

The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.

```properties
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="whole product message: {{value}}"
connect.http.time.interval=5
```

### Dynamic string template containing avro message fields

Fields from the AVRO message are substituted into the message body in the following example:

```properties
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="product: {{value.product}}"
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://schema-registry:8081
```

## Error/Success Reporter

Starting from version 8.1 as pilot release we give our customers ability to use functionality called **Reporter** which (if enabled) writes Success and Error processing reports to specified Kafka topic. Reports don't have key and you can find details about status in the message headers and value.

In order to enable this functionality we have to enable one (or both if we want full reporting) of the properties below:

```bash
connect.reporting.error.config.enabled=true
connect.reporting.success.config.enabled=true
```

These settings configure the Kafka producer for success and error reports. Full configuration options are available in the [Success Reporter Properties](#success-reporter-properties) and [Error Reporter Properties](#error-reporter-properties) sections. Three examples follow:

1. [local/plain configuration](#plain-error-reporting)
2. [SASL configuration](#error-reporting-using-sasl)
3. [SSL configuration](#error-reporting-using-ssl)

### Plain Error Reporting

This is the most common scenario for on-premises Kafka Clusters used just for monitoring

```properties
connect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=localhost:9094
connect.reporting.error.config.topic=http-monitoring
```

### Error Reporting using SASL

Using SASL provides a secure and standardized method for authenticating connections to an external Kafka cluster. It is especially valuable when connecting to clusters that require secure communication, as it supports mechanisms like SCRAM, GSSAPI (Kerberos), and OAuth, ensuring that only authorized clients can access the cluster. Additionally, SASL can help safeguard credentials during transmission, reducing the risk of unauthorized access.

```properties
connect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=my-kafka-cluster.com:9093
connect.reporting.error.config.security.protocol=SASL_SSL
connect.reporting.error.config.sasl.mechanism=PLAIN
connect.reporting.error.config.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="MYUSER" password="MYPASSWORD";
```

### Error Reporting using SSL

Using SSL ensures secure communication between clients and the Kafka cluster by encrypting data in transit. This prevents unauthorized parties from intercepting or tampering with sensitive information. SSL also supports mutual authentication, allowing both the client and server to verify each other’s identities, which enhances trust and security in the connection.

```properties
connect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=SSL://my-ssl-protected-cluster:9094
connect.reporting.error.config.security.protocol=SSL
connect.reporting.error.config.ssl.keystore.location=/path/to/my/keystore.p12
connect.reporting.error.config.ssl.keystore.type=PKCS12
connect.reporting.error.config.ssl.truststore.location=/path/to/my/truststore.p12
connect.reporting.error.config.ssl.truststore.password=************
connect.reporting.error.config.ssl.truststore.type=PKCS12
connect.reporting.error.config.topic=http-error-topic
```

## Options

## Configuration parameters

This sink connector supports the following options as part of its configuration:

| Field                                   | Type           | Required | Values (Default)                                                                                                                                                                                          |
| --------------------------------------- | -------------- | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `connect.http.method`                   | HttpMethod     | Yes      | POST, PUT, PATCH                                                                                                                                                                                          |
| `connect.http.endpoint`                 | String         | Yes      | [URL Template](#url-templating)                                                                                                                                                                           |
| `connect.http.request.content`          | String         | Yes      | [Content Template](#content-template)                                                                                                                                                                     |
| `connect.http.authentication.type`      | Authentication | No       | [Authentication Options](#authentication-options) (none)                                                                                                                                                  |
| `connect.http.request.headers`          | List\[String]  | No       | [Headers List](#headers-list)                                                                                                                                                                             |
| `connect.http.batch.count`              | Int            | No       | The number of records to batch before sending the request, see [Batch Configuration](#batch-configuration)                                                                                                |
| `connect.http.batch.size`               | Int            | No       | The size of the batch in bytes before sending the request, see [Batch Configuration](#batch-configuration)                                                                                                |
| `connect.http.time.interval`            | Int            | No       | The time interval in milliseconds to wait before sending the request                                                                                                                                      |
| `connect.http.upload.sync.period`       | Int            | No       | Upload Sync Period (100) - polling time period for uploads in milliseconds                                                                                                                                |
| `connect.http.error.threshold`          | Int            | No       | The number of errors to tolerate before failing the sink (5)                                                                                                                                              |
| connect.http.retry.mode                 | String         | No       | The http retry mode. It can be one of : Fixed or Exponential(default)                                                                                                                                     |
| `connect.http.retries.on.status.codes`  | List\[String]  | No       | The status codes to retry on (default codes are : 408,429,500,502,5003,504)                                                                                                                               |
| `connect.http.retries.max.retries`      | Int            | No       | The maximum number of retries to attempt (default is 5)                                                                                                                                                   |
| connect.http.retry.fixed.interval.ms    | Int            | No       | The set duration to wait before retrying HTTP requests. The default is 10000 (10 seconds)                                                                                                                 |
| `connect.http.retries.max.timeout.ms`   | Int            | No       | The maximum time in milliseconds to retry a request when Exponential retry is set. Backoff is used to increase the time between retries, up to this maximum (30000)                                       |
| `connect.http.connection.timeout.ms`    | Int            | No       | The HTTP connection timeout in milliseconds (10000)                                                                                                                                                       |
| connect.http.max.queue.size             | int            | int      | For each processed topic, the connector maintains an internal queue. This value specifies the maximum number of entries allowed in the queue before the enqueue operation blocks. The default is 100,000. |
| connect.http.max.queue.offer.timeout.ms | int            | int      | The maximum time window, specified in milliseconds, to wait for the internal queue to accept new records. The d                                                                                           |

## SSL Configuration Properties

| **Property Name**            | **Description**                                                                                                                                        |
| ---------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `ssl.truststore.location`    | Path to the truststore file containing the trusted CA certificates for verifying broker certificates.                                                  |
| `ssl.truststore.password`    | Password for the truststore file to protect its integrity.                                                                                             |
| `ssl.truststore.type`        | Type of the truststore (e.g., `JKS`, `PKCS12`). Default is `JKS`.                                                                                      |
| `ssl.keystore.location`      | Path to the keystore file containing the client’s private key and certificate chain for client authentication.                                         |
| `ssl.keystore.password`      | Password for the keystore to protect the private key.                                                                                                  |
| `ssl.keystore.type`          | Type of the keystore (e.g., `JKS`, `PKCS12`). Default is `JKS`.                                                                                        |
| `ssl.protocol`               | The SSL protocol used for secure connections (e.g., `TLSv1.2`, `TLSv1.3`). Default is `TLSv1.3`.                                                       |
| `ssl.keymanager.algorithm`   | Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.   |
| `ssl.trustmanager.algorithm` | Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |

## Error Reporter Properties

| **Property Name**                                  | **Description**                                                                                                                 |
| -------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------- |
| `connect.reporting.error.config.enabled`           | Specifies whether the reporter is enabled. `false` by default.                                                                  |
| `connect.reporting.error.config.bootstrap.servers` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled. |
| `connect.reporting.error.config.topic`             | Specifies the topic for Reporter to write to.                                                                                   |
| `connect.reporting.error.config.location`          | SASL Mechanism used when connecting.                                                                                            |
| `connect.reporting.error.config.sasl.jaas.config`  | JAAS login context parameters for SASL connections in the format used by JAAS configuration files.                              |
| `connect.reporting.error.config.sasl.mechanism`    | SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.               |

{% hint style="success" %}
The error reporter can also be configured with SSL Properties. See the section [SSL Configuration Properties](#ssl-configuration-properties). In this case all properties should be prefixed with `connect.reporting.error.config` to ensure they apply to the error reporter.
{% endhint %}

## Success Reporter Properties

| **Property Name**                                    | **Description**                                                                                                                 |
| ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------- |
| `connect.reporting.success.config.enabled`           | Specifies whether the reporter is enabled. `false` by default.                                                                  |
| `connect.reporting.success.config.bootstrap.servers` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled. |
| `connect.reporting.success.config.topic`             | Specifies the topic for Reporter to write to.                                                                                   |
| `connect.reporting.success.config.location`          | SASL Mechanism used when connecting.                                                                                            |
| `connect.reporting.success.config.sasl.jaas.config`  | JAAS login context parameters for SASL connections in the format used by JAAS configuration files.                              |
| `connect.reporting.success.config.sasl.mechanism`    | SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.               |

{% hint style="success" %}
The error reporter can also be configured with SSL Properties. See the section [SSL Configuration Properties](#ssl-configuration-properties). In this case all properties should be prefixed with `connect.reporting.success.config` to ensure they apply to the success reporter.
{% endhint %}
