HTTP


A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.

Features 

  • Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).
  • URL, header and content templating ability give you full control of the HTTP request.
  • Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.
  • JSON configuration.

Configuration 

The connector has a single configuration property over and above what is provided with Kafka Connect. Configurations for this connector are collapsed into a json value in this property.

Connector PropertiesExplanation
connect.http.configjson configuration string

Options 

This sink connector supports the following options as part of its json configuration:

FieldTypeRequiredValues (Default)
methodHttpMethodYesPOST, PUT, PATCH
endpointStringYesURL Template
contentStringYesContent Template
authenticationAuthenticationNoAuthentication Options
headersSeq[(String, String)]NoHeaders List
sslConfigSSLConfigNoSSL Configuration
batchBatchConfigNoBatch Configuration
errorThresholdInt,NoError Threshold (5) - number of consecutive errors to tolerate before connector fails.
uploadSyncPeriodInt,NoUpload Sync Period (5) - polling time period for uploads.

Configuration Examples 

Minimal Configuration 

The examples within this document show JSON content for the connect.http.config property. We have spaced these out onto multiple lines.

{
  "method":"Put",
  "endpoint":"http://myaddress.example.com",
  "content":"Your HTTP Message Body Goes Here!"
}

Content Template 

The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.

Static Templating 

If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:

{
  "method":"Put",
  "endpoint":"http://myaddress.example.com",
  "content":"Your HTTP Message Body Goes Here!"
}

Single Message Templating 

When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.

In your configuration, in the template property of the Json, you can define template substitutions like the following example:

(please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)

{
  "method":"Put",
  "endpoint":"http://myaddress.example.com",
  "content":"<product><id>{{value.name}}</id></product>"
}

Multiple Message Templating 

To collapse multiple messages into a single HTTP request you can use the multiple messaging template. This is automatic if the template has a messages tag. See the below example:

  <messages>
    {{#message}}
        <message>
          <topic>{{topic}}</topic>
          <employee>{{value.employeeId}}</employee>
          <order>{{value.orderNo}}</order>
          <groupDomain>{{value.groupDomain}}</groupDomain>
        </message>
    {{/message}}
  </messages>

Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.

In your connector configuration json string this will look like this:

{
  "method":"Put",
  "endpoint":"http://myaddress.example.com",
  "content":"<messages>{{#message}}<message><topic>{{topic}}</topic><employee>{{value.employeeId}}</employee><order>{{value.orderNo}}</order><groupDomain>{{value.groupDomain}}</groupDomain></message>{{/message}}</messages>"
}

The final result will be HTTP requests with bodies like this:

  <messages>
    <message>
      <topic>myTopic</topic>
       <employee>Abcd1234</employee>
       <order>10</order>
       <groupDomain>myExampleGroup.uk</groupDomain>
    </message>
    <message>
       <topic>myTopic</topic>
       <employee>Efgh5678</employee>
       <order>11</order>
       <groupDomain>myExampleGroup.uk</groupDomain>
    </message>
  </messages>

Available Keys 

When using simple and multiple message templating, the following are available:

FieldUsage Example
Header{{header.correlation-id}}
Value{{value}}
{{value.product.id}}
Key{{key}}
{{key.customer.number}}
Topic{{topic}}
Partition{{partition}}
Offset{{offset}}
Timestamp{{timestamp}}

URL Template 

URL including protocol (eg. http://lenses.io). Template variables can be used.

The URL is also a Content Template so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.

Authentication Options 

Currently, the HTTP Sink supports either no authentication or BASIC HTTP authentication.

No Authentication (Default) 

By default, no authentication is set.

BASIC HTTP Authentication 

BASIC auth can be configured by providing a configuration like this:

{
  "method":"Put",
  "endpoint":"http://myaddress.example.com",
  "content":"My content template",
  "authentication":{"username":"user","password":"pass","type":"BasicAuthentication"}
}

Headers List 

To customise the headers sent with your HTTP request you can supply a Headers List.

Each header key and value is also a Content Template so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.

Example:

{
  "method":"Put",
  "endpoint":"http://myaddress.example.com",
  "content":"My content template",
  "headers":[["Content-Type","text/plain"], ["X-User","{{header.kafkauser}}"], ["Product", "{{value.product.id}}"]]
}

SSL Configuration 

{
  "trustStorePath": "/path/to/truststore",
  "trustStorePass": "truststorePassword",
  "keyStorePath": "/path/to/keystore",
  "keyStorePass": "keystorePassword",
  "useClientCert": true,
  "keyStoreType": "PKCS12",
  "trustStoreType": "PKCS12"
}

Example:

{
  "method":"Put",
  "endpoint":"http://myaddress.example.com",
  "content":"My content template",
  "headers":[["Content-Type","text/plain"], ["X-User","{{header.kafkauser}}"], ["Product", "{{value.product.id}}"]],
  "ssl": {"trustStorePath":"/path/to/truststore","trustStorePass":"truststorePassword","keyStorePath":"/path/to/keystore","keyStorePass":"keystorePassword","useClientCert":true,"keyStoreType":"PKCS12","trustStoreType":"PKCS12"}

}

Notes 

  • the trustStore and keyStore will need to be made available on the worker nodes for your Kafka Connect cluster.
  • if you do not specify the key store or trust store type, they will default to JKS.
  • useClientCert defaults to false.

Batch Configuration 

The connector offers three distinct flush options for data management:

  • Flush by Count - triggers a file flush after a specified number of records have been written to it.
  • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
  • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

The flush options are configured using the batchCount, batchSize and timeInterval fields in the Batch Configuration object. The settings are optional and if not specified the defaults are:

FieldDefault
batchCount50_000 records
batchSize500000000 (500MB)
timeInterval3_600 seconds (1 hour)
{"batchCount":50000,"batchSize":500000000,"timeInterval":3600}

Configuration Examples 

Some configuration examples follow on how to apply this connector to different message types.

These include converters, which are required to instruct Kafka Connect on how to read the source content.

Static string template 

In this case the converters are irrelevant as we are not using the message content to populate our message template.

connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"My Static Content Template","batch":{"batchCount":1}}

Dynamic string template 

The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.

connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"{{value}}","batch":{"batchCount":1}}
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Dynamic string template containing json message fields 

Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.

connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"product: {{value.product}},"batch":{"batchSize":1}}
value.converter.schemas.enable=false

Dynamic string template containing whole json message 

The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.

connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"whole product message: {{value}}","batch":{"timeInterval":5}}

Dynamic string template containing avro message fields 

Fields from the AVRO message are substituted into the message body in the following example:

connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"product: {{value.product}}"}
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://schema-registry:8081
--
Last modified: April 24, 2024