All pages
Powered by GitBook
1 of 16

Loading...

Loading...

Loading...

Azure Data Lake Gen2

This page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.

This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to Azure Data Lake Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.

Connector Class

io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector

Example

For more examples see the tutorials.

connector.class=io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH

KCQL Support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The connector uses KCQL to map topics to Datalake buckets and paths. The full KCQL syntax is:

INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]

Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .:

{
  ...
  "a.b": "value",
  ...
}

In this case, you can use the following KCQL statement:

INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`

Target Bucket and Path

The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

Here are a few examples:

INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;

SQL Projection

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

Source Topic

The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics or topics.regex property in the connector and ensure proper mapping to the KCQL statements.

Set the FROM clause to *. This will auto map the topic as a partition.

KCQL Properties

The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

Name
Description
Type
Available Values
Default Value

padding.type

Specifies the type of padding to be applied.

LeftPad, RightPad, NoOp

LeftPad, RightPad, NoOp

LeftPad

padding.char

Defines the character used for padding.

Char

‘0’

padding.length.partition

Sets the padding length for the partition.

Int

0

padding.length.offset

Sets the padding length for the offset.

Int

12

partition.include.keys

Specifies whether partition keys are included.

Boolean

false Default (Custom Partitioning): true

store.envelope

Indicates whether to store the entire Kafka message

Boolean

store.envelope.fields.key

Indicates whether to store the envelope’s key.

Boolean

store.envelope.fields.headers

Indicates whether to store the envelope’s headers.

Boolean

store.envelope.fields.value

Indicates whether to store the envelope’s value.

Boolean

store.envelope.fields..metadata

Indicates whether to store the envelope’s metadata.

Boolean

flush.size

Specifies the size (in bytes) for the flush operation.

Long

500000000 (500MB)

flush.count

Specifies the number of records for the flush operation.

Int

50000

flush.interval

Specifies the interval (in seconds) for the flush operation.

Long

3600 (1 hour)

key.suffix

When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added

String

<empty>

The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the Datalake Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the Datalake Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.

Partitioning & File names

The object key serves as the filename used to store data in Datalake. There are two options for configuring the object key:

  • Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.

  • Custom: The object key is driven by the PARTITIONBY clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.

The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.

The partition clause works on Header, Key and Values fields of the Kafka message.

Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of Azure Datalake key length restrictions.

To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:

PARTITIONBY fieldA, fieldB

However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

You can also use the entire message key as long as it can be coerced into a primitive type:

PARTITIONBY _key

In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the Datalake object key name:

PARTITIONBY _key.fieldA, _key.fieldB

Kafka message headers can also be used in the Datalake object key definition, provided the header values are of primitive types easily convertible to strings:

PARTITIONBY _header.<header_key1>[, _header.<header_key2>]

Customizing the object key can leverage various components of the Kafka message. For example:

PARTITIONBY fieldA, _key.fieldB, _headers.fieldC

This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure Datalake object keys effectively.

To enable Athena-like partitioning, use the following syn

INSERT INTO $container[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
    'partition.include.keys'=true,
)

Rolling Windows

Storing data in Azure Datalake and partitioning it by time is a common practice in data management. For instance, you may want to organize your Datalake data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation here.

Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:

connector.class=io.lenses.streamreactor.connect.azure.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH

In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

Data Storage Format

While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within Azure Datalake. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.

Supported storage formats encompass:

  • AVRO

  • Parquet

  • JSON

  • CSV (including headers)

  • Text

  • BYTES

Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in Datalake. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.

By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

Not supported with a custom partition strategy.

{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Key, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 0,
    "partition": 0,
    "timestamp": 0,
    "topic": "topic"
  }
}

Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in Datalake is desired.

Examples

Storing the message Value Avro data as Parquet in Datalake:

...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...

The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in Datalake.

...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...

Enabling the full message stored as JSON in Datalake:

  ...
  connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
  value.converter=org.apache.kafka.connect.json.JsonConverter
  key.converter=org.apache.kafka.connect.storage.StringConverter
  ...

Enabling the full message stored as AVRO in Datalake:

...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...

If the restore (see the Datalake Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...

Flush Options

The connector offers three distinct flush options for data management:

  • Flush by Count - triggers a file flush after a specified number of records have been written to it.

  • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

  • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see KCQL Properties section). The settings are optional and if not specified the defaults are:

  • flush.count = 50_000

  • flush.size = 500000000 (500MB)

  • flush.interval = 3600 (1 hour)

A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.

When connect.datalake.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:

pgsqlCopyEditmessage1 -> schema1  
message2 -> schema1  
  (No flush needed – same schema)

message3 -> schema2  
  (Flush occurs – new schema introduced)

message4 -> schema2  
  (No flush needed – same schema)

message5 -> schema1  
  Without optimization: would trigger a flush  
  With optimization: no flush – schema1 is backward-compatible with schema2

message6 -> schema2  
message7 -> schema2  
  (No flush needed – same schema, it would happen based on the flush thresholds)

Flushing By Interval

The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to Data Lake). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.

Compression

AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.

Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:

Compression
Avro Support
Avro (requires Level)
Parquet Support
JSON

UNCOMPRESSED

✅

✅

✅

SNAPPY

✅

✅

GZIP

✅

✅

LZ0

✅

LZ4

✅

BROTLI

✅

BZIP2

✅

ZSTD

✅

⚙️

✅

DEFLATE

✅

⚙️

XZ

✅

⚙️

Please note that not all compression libraries are bundled with the Datalake connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

Authentication

The connector offers two distinct authentication modes:

  • Default: This mode relies on the default Azure authentication chain, simplifying the authentication process.

  • Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with Azure.

  • Credentials: In this mode, explicit configuration of Azure Access Key and Secret Key is required for authentication.

When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

Here’s an example configuration for the “Credentials” mode:

...
connect.datalake.azure.auth.mode=Credentials
connect.datalake.azure.account.name=$AZURE_ACCOUNT_NAME
connect.datalake.azure.account.key=$AZURE_ACCOUNT_KEY
...

And here is an example configuration using the “Connection String” mode:

...
connect.datalake.azure.auth.mode=ConnectionString
connect.datalake.azure.connection.string=$AZURE_CONNECTION_STRING
...

For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers.

Error policies

The connector supports Error policies.

Indexes Directory

The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.

By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes directory.

You can configure the root directory for these index files using the property connect.datalake.indexes.name. This property specifies the path from the root of the data lake filesystem. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.

Examples

Index Name (connect.datalake.indexes.name)

Resulting Indexes Directory Structure

Description

.indexes (default)

.indexes/<connector_name>/

The default setup, where each connector uses its own subdirectory within .indexes.

custom-indexes

custom-indexes/<connector_name>/

Custom root directory custom-indexes, with a subdirectory for each connector.

indexes/datalake-connector-logs

indexes/datalake-connector-logs/<connector_name>/

Uses a custom subdirectory datalake-connector-logs within indexes, with a subdirectory for each connector.

logs/indexes

logs/indexes/<connector_name>/

Indexes are stored under logs/indexes, with a subdirectory for each connector.

Option Reference

Name
Description
Type
Available Values
Default Value

connect.datalake.azure.auth.mode

Specifies the Azure authentication mode for connecting to Datalake.

string

“Credentials”, “ConnectionString” or “Default”

“Default”

connect.datalake.azure.account.key

The Azure Account Key used for authentication.

string

(Empty)

connect.datalake.azure.account.name

The Azure Account Name used for authentication.

string

(Empty)

connect.datalake.pool.max.connections

Specifies the maximum number of connections allowed in the Azure Client’s HTTP connection pool when interacting with Datalake.

int

-1 (undefined)

50

connect.datalake.endpoint

Datalake endpoint URL.

string

(Empty)

connect.datalake.error.policy

Defines the error handling policy when errors occur during data transfer to or from Datalake.

string

“NOOP,” “THROW,” “RETRY”

“THROW”

connect.datalake.max.retries

Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.

int

20

connect.datalake.retry.interval

Specifies the interval (in milliseconds) between retry attempts by the connector.

int

60000

connect.datalake.http.max.retries

Sets the maximum number of retries for the underlying HTTP client when interacting with Datalake.

long

5

connect.datalake.http.retry.interval

Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.

long

50

connect.datalake.local.tmp.directory

Enables the use of a local folder as a staging area for data transfer operations.

string

(Empty)

connect.datalake.kcql

A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.

string

(Empty)

connect.datalake.compression.codec

Sets the Parquet compression codec to be used when writing data to Datalake.

string

“UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”

“UNCOMPRESSED”

connect.datalake.compression.level

Sets the compression level when compression is enabled for data transfer to Datalake.

int

1-9

(Empty)

connect.datalake.seek.max.files

Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.

int

5

connect.datalake.indexes.name

Configure the indexes root directory for this connector.

string

".indexes"

connect.datalake.exactly.once.enable

By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management

boolean

true, false

true

connect.datalake.schema.change.detector

Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).

string

default, version, compatibility

default

connect.datalake.skip.null.values

Skip records with null values (a.k.a. tombstone records).

boolean

true, false

false

connect.datalake.latest.schema.optimization.enabled

When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.

boolean

true,false

false

Azure Event Hubs

This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.

Coming soon!

Loading...

Cassandra

This page describes the usage of the Stream Reactor Cassandra Sink Connector.

The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.

Connector Class

Example

KCQL support

You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

Examples:

Deletion in Cassandra

Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.

Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:

If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:

Deletion will only occur if a message with an empty payload is received from Kafka.

Ensure your ordinal position of the connect.cassandra.delete.struct_flds matches the binding order in the Cassandra delete statement!

Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

Error policies

Option Reference

Elasticsearch

This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.

Connector Class

Elasticsearch 6

Elasticsearch 7

Example

KCQL support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

Examples:

Kafka Tombstone Handling

It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:

  • IGNORE (ignores tombstones entirely)

  • FAIL (throws Exception if tombstone happens)

  • DELETE (deletes index with specified id)

Example:

Primary Keys

The PK keyword allows you to specify fields that will be used to generate the key value in Elasticsearch. The values of the selected fields are concatenated and separated by a hyphen (-).

If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.

Field Prefixes

When defining fields, specific prefixes can be used to determine where the data should be extracted from:

  • _key Prefix Specifies that the value should be extracted from the message key.

    • If a path is provided after _key, it identifies the location within the key where the field value resides.

    • If no path is provided, the entire message key is used as the value.

  • _value Prefix Specifies that the value should be extracted from the message value.

    • The remainder of the path identifies the specific location within the message value to extract the field.

  • _header Prefix Specifies that the value should be extracted from the message header.

    • The remainder of the path indicates the name of the header to be used for the field value.

Insert and Upsert modes

INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.

Document Type

WITHDOCTYPE allows you to associate a document type to the document inserted.

Index Suffix

WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.

Example:


Index Names

Static Index Names

To use a static index name, define the target index in the KCQL statement without any prefixes:

This will consistently create an index named index_name for any messages consumed from topicA.

Extracting Index Names from Headers, Keys, and Values

Headers

To extract an index name from a message header, use the _header prefix followed by the header name:

This statement extracts the value from the gate header field and uses it as the index name.

For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

In this case, the value of the header named prefix.abc.suffix is used to form the index name.

Keys

To use the full value of the message key as the index name, use the _key prefix:

For example, if the message key is "freddie", the resulting index name will be freddie.

Values

To extract an index name from a field within the message value, use the _value prefix followed by the field name:

This example uses the value of the name field from the message's value. If the field contains "jason", the index name will be jason.

Nested Fields in Values

To access nested fields within a value, specify the full path using dot notation:

If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.

Fields with Dots in Their Names

For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

If the value structure contains:

The extracted index name will be hans.

Auto Index Creation

The Sink will automatically create missing indexes at startup.

Please note that this feature is not compatible with index names extracted from message headers/keys/values.

Options Reference

Name
Description
Type
Default Value

connect.elastic.protocol

URL protocol (http, https)

string

http

connect.elastic.hosts

List of hostnames for Elastic Search cluster node, not including protocol or port.

string

localhost

connect.elastic.port

Port on which Elastic Search node listens on

string

9300

connect.elastic.tableprefix

Table prefix (optional)

string

connect.elastic.cluster.name

Name of the elastic search cluster, used in local mode for setting the connection

string

elasticsearch

connect.elastic.write.timeout

The time to wait in millis. Default is 5 minutes.

int

300000

connect.elastic.batch.size

How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once

int

4000

connect.elastic.use.http.username

Username if HTTP Basic Auth required default is null.

string

connect.elastic.use.http.password

Password if HTTP Basic Auth required default is null.

string

connect.elastic.error.policy

Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.elastic.max.retries

The maximum number of times to try the write again.

int

20

connect.elastic.retry.interval

The time in milliseconds between retries.

int

60000

connect.elastic.kcql

KCQL expression describing field selection and routes.

string

connect.elastic.pk.separator

Separator used when have more that one field in PK

string

-

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

KCQL Properties

Name
Description
Type
Default Value

behavior.on.null.values

Specifies behavior on Kafka tombstones: IGNORE , DELETE or FAIL

String

IGNORE

SSL Configuration Properties

Property Name

Description

ssl.truststore.location

Path to the truststore file containing the trusted CA certificates for verifying broker certificates.

ssl.truststore.password

Password for the truststore file to protect its integrity.

ssl.truststore.type

Type of the truststore (e.g., JKS, PKCS12). Default is JKS.

ssl.keystore.location

Path to the keystore file containing the client’s private key and certificate chain for client authentication.

ssl.keystore.password

Password for the keystore to protect the private key.

ssl.keystore.type

Type of the keystore (e.g., JKS, PKCS12). Default is JKS.

ssl.protocol

The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.

ssl.trustmanager.algorithm

Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

ssl.keymanager.algorithm

Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

SSL Configuration

Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.

While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:

  • Data encryption: Prevents unauthorized access to data being transferred.

  • Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.

  • Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).

Configuration Example

ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS  # Can also be PKCS12 if applicable

ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS  # Can also be PKCS12 if applicable

ssl.protocol=TLSv1.2  # Or TLSv1.3 for stronger security

ssl.trustmanager.algorithm=PKIX  # Default algorithm for managing certificates
ssl.keymanager.algorithm=PKIX  # Default algorithm for managing certificates

Terminology:

  • Truststore: Holds certificates to check if the node’s certificate is valid.

  • Keystore: Contains your client’s private key and certificate to prove your identity to the node.

  • SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.

  • Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.

Loading...

Loading...

HTTP

This page describes the usage of the Stream Reactor HTTP Sink Connector.

A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.

Features

  • Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).

  • URL, header and content templating ability give you full control of the HTTP request.

  • Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.

Connector Class

Example

Content Template

The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.

Static Templating

If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:

Single Message Templating

When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.

In your configuration, in the content property of your config, you can define template substitutions like the following example:

(please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)

Multiple Message Templating

To collapse multiple messages into a single HTTP request, you can use the multiple messaging template. This is automatic if the template has a messages tag. See the below example:

Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.

Your connector configuration will look like this:

The final result will be HTTP requests with bodies like this:

Available Keys

When using simple and multiple message templating, the following are available:

URL Templating

URL including protocol (eg. http://lenses.io). Template variables can be used.

Authentication Options

Currently, the HTTP Sink supports either no authentication, BASIC HTTP authentication and OAuth2 authentication.

No Authentication (Default)

By default, no authentication is set. This can be also done by providing a configuration like this:

BASIC HTTP Authentication

BASIC auth can be configured by providing a configuration like this:

OAuth2 Authentication

OAuth auth can be configured by providing a configuration like this:

Headers List

To customise the headers sent with your HTTP request you can supply a Headers List.

Example:

SSL Configuration

Batch Configuration

The connector offers three distinct flush options for data management:

  • Flush by Count - triggers a file flush after a specified number of records have been written to it.

  • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

  • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

It's worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven't reached their thresholds.

Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

The flush options are configured using the batchCount, batchSize and `timeInterval properties. The settings are optional and if not specified the defaults are:

Configuration Examples

Some configuration examples follow on how to apply this connector to different message types.

These include converters, which are required to instruct Kafka Connect on how to read the source content.

Static string template

In this case the converters are irrelevant as we are not using the message content to populate our message template.

Dynamic string template

The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.

Dynamic string template containing json message fields

Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.

Dynamic string template containing whole json message

The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.

Dynamic string template containing avro message fields

Fields from the AVRO message are substituted into the message body in the following example:

Error/Success Reporter

Starting from version 8.1 as pilot release we give our customers ability to use functionality called Reporter which (if enabled) writes Success and Error processing reports to specified Kafka topic. Reports don't have key and you can find details about status in the message headers and value.

In order to enable this functionality we have to enable one (or both if we want full reporting) of the properties below:

Plain Error Reporting

This is the most common scenario for on-premises Kafka Clusters used just for monitoring

Error Reporting using SASL

Using SASL provides a secure and standardized method for authenticating connections to an external Kafka cluster. It is especially valuable when connecting to clusters that require secure communication, as it supports mechanisms like SCRAM, GSSAPI (Kerberos), and OAuth, ensuring that only authorized clients can access the cluster. Additionally, SASL can help safeguard credentials during transmission, reducing the risk of unauthorized access.

Error Reporting using SSL

Using SSL ensures secure communication between clients and the Kafka cluster by encrypting data in transit. This prevents unauthorized parties from intercepting or tampering with sensitive information. SSL also supports mutual authentication, allowing both the client and server to verify each other’s identities, which enhances trust and security in the connection.

Options

Configuration parameters

This sink connector supports the following options as part of its configuration:

SSL Configuration Properties

Error Reporter Properties

Success Reporter Properties

Loading...

JMS

This page describes the usage of the Stream Reactor JMS Sink Connector.

Connector Class

Example

KCQL support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

Examples:

JMS Topics and Queues

The sink can write to either topics or queues, specified by the WITHTYPE clause.

JMS Payload

When a message is sent to a JMS target it can be one of the following:

  • JSON - Send a TextMessage

  • AVRO - Send a BytesMessage

  • MAP - Send a MapMessage

  • OBJECT - Send an ObjectMessage

This is set by the WITHFORMAT keyword.

Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

Error policies

Option Reference

Loading...

Loading...

Loading...