Lenses Kafka Connectors are a collection of open-source, Apache 2.0 licensed, Kafka Connect Connectors. Maintained by Lenses.io to deliver open-source Kafka Connectors to the community.
Learn
Learn how Kafka Connect works.
Connectors
Explore the available connectors in the Stream Reactor and how to configure them.
Secret Providers
Learn how to secure your secrets in Kafka Connect.
Single Message Transform
Learn how to use SMTs and configure SMTs to transform messages.
Support
Get support from the Community or Enterprise support from Lenses.io for the connectors.
Azure Event Hubs
This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.
Coming soon!
Cloud Storage Examples
Examples for AWS S3, Azure Datalake and GCP Storage connectors.
Single Message Transforms
This page describes how to used SMTs your Kafka Connect Clusters.
Loading ragged width files
Coming soon!
Loading XML from Cloud storage
Coming soon!
Using the MQTT Connector with RabbitMQ
Coming soon!
Using dead letter queues
Coming soon!
Release notes
This page describes the release notes for the Stream Reactor.
Install
This page describes installing the Lenses Kafka Connectors.
If you do not use the plugin.path and add the connectors directly to the CLASSPATH you may have dependency conflicts.
Download the release and unpack.
Within the unpacked directory you will find the following structure:
/opt/stream-reactor-x.x.x-x.x.x
├── bin
├── conf
├── libs
├── LICENSE
The libs directory contains all the Stream Reactor Connector jars. Edit your Connect worker properties add the path to the directory containing the connectors and restart your workers. Repeat this process for all the Connect workers in your cluster. The connectors must be available to all the workers.
Example:
Install
This page describes install the Lenses Secret Providers for Kafka Connect.
Add the plugin to the worker classloader isolation via the plugin.path option:
For Azure do not use the classloader isolation (plugin.path) that Kafka Connect provides. The Azure SDK uses the default classloader and will not find the HTTP client.
Secret Rotation
To allow for secret rotation add config.action.reload to your Connect workers properties files.
This property accepts one of two options:
none - No action happens at a connector failure (e.g. it can no longer connect to an external system)
restart - The work will schedule a restart of the connector
Secrets will only be reloaded if the Connector restarts.
Connectors
This page describes the Stream Reactor connector plugins.
Install
Learn how to install Connectors.
Sources
Learn about the available source connectors.
Sinks
Learn about the available sink connectors.
Testing
Coming soon!
Utilities
Coming soon!
Tutorials
This page contains tutorials for common Kafka Connect use cases.
Kafka Connect uses the Kafka Producer API and the Kafka Consumer API to load data into Apache Kafka and output data from Kafka to another storage engine. A Connector is an instantiation of a connector plugin defined by a configuration file.
Kafka Connect Core Concepts
Kafka Connect is a plugin framework. It exposes APIs for developers to implement standard recipes (known as Connector Plugins) for exchanging data between Kafka and other data systems. It provides the runtime environment to execute the plugins in the form of Connect Clusters, made up of Workers.
The fundamental building blocks of Connect Clusters are the Workers. Workers manage the plumbing required to interact with Kafka, which serves to coordinate the workload. They also handle the management of connector plugins and the creation and supervision of their instances, known as connector instances.
Connect Clusters can be configured either as a standalone cluster (consisting of one Worker) or distributed (consisting of many Workers, ideally on different machines). The benefit of the distributed mode is fault tolerance and load balancing of workload across machines.
A Connector Plugin is a concrete implementation of the Plugin APIs exposed by Connect for a specific third-party system, for example, S3. If a Connector Plugin is extracting data from a system and writing to Kafka, it’s called a Source Plugin. If it ships data from Kafka to another system, it’s called a Sink Plugin.
The modular architecture of Connect, with Workers managing connector plugins and connector instances, abstracts away the complexities of data integration, enabling developers to concentrate on the core functionality of their applications.
Connectors Plugins
The Community has been developing Connectors to connect to all sorts of systems for years. Here at Lenses, we are the biggest contributor to open-source connectors via our .
The Connector Plugins are code, deployed as jar files, and added to the classpath of each Worker in the Connect cluster.
Why does it need to be on each Worker? The reason lies in the distributed nature of Connect and its workload distribution mechanism. The Connect framework evenly distributes tasks among multiple Workers using the Kafka consumer group protocol. This ensures load balancing and fault tolerance within the Connect cluster. Each Worker is responsible for executing a subset of the assigned tasks.
To enable the execution of assigned workloads, each Worker needs to have access to the required plugins locally. By having the plugins available on each Worker's classpath, the Connect framework can dynamically load and utilize them whenever necessary. This approach eliminates the need for plugins to be distributed separately or retrieved from a centralized location during runtime. But it does mean that when you install your Connect Cluster that you also need to ensure each plugin is installed.
Tasks vs Connectors
Each Connector Plugin has to implement two interfaces:
Connector (Source or Sink)
Task (Source or Sink)
The Connector interface is responsible for defining and configuring the instance of the Connector, for example, validating the configuration and then splitting that configuration up for the Connect APIs to distribute amongst the workers.
The actual work is done by the Task class. Connect sends out, via Kafka, a configuration, defined by the Connector class, to each worker. Each assigned Worker picks up the task (it's listening to the configuration topic) and creates an instance of the task.
Connector instances are created on the Worker you submit the creation request to, task instances can also be on the same Worker, but also on the other Workers. Connect distributes the tasks to Workers via Kafka; they have internal consumer groups listening to the system topics Connect uses. If you look into the default topic, connect-configs, you will see the split of the Connector configs for each task.
Converters
A Kafka message is made up of:
Headers
Key
Value
Headers are a map, while the key and value are stored as byte arrays in Kafka. Typically, these byte arrays represent data stored as JSON or AVRO, however, it could be anything.
To decouple from the format of data inside Kafka topics, Connect uses an internal format called . Structs have fields, and fields have types defined in a Schema.
Converters translate the byte arrays, which could be AVRO or other formats, to Struct for Sink connectors and vice versa for Source connectors.
For example, you use Avro as your data format in your cluster. The Avro converter allows you to build connectors and interact with Connect Struct only; you may decide to move to Protobuf later, and don't want to reimplement your connector. This is where the converter comes in. It handles converting the Struct to Avro for Source connectors and Struct to external systems, e.g. Cassandra, for sinks. You can then swap this out for a different converter; you, as a developer, only deal with Structs.
The following converters are available with Apache Kafka:
org.apache.kafka.connect.converters.DoubleConverter: Serializes to and deserializes from DOUBLE values. When converting from bytes to Connect format, the converter returns an optional FLOAT64 schema.
org.apache.kafka.connect.converters.FloatConverter: Serializes to and deserializes from FLOAT values. When converting from bytes to Connect format, the converter returns an optional FLOAT32 schema.
Confluent provides support for (Schema Registry required):
Kafka Connect supports an internal data type of Secret. If a connector implements this type as part of its Config definition, it will be masked in any logging, however, it will still be exposed in API calls to Kafka Connect Workers.
To solve this, Kafka Connect supports secret provider plugins. They allow for indirect references, resolved at the initialization of a Connector instance, to external secret providers.
Lenses.io provides Secret Providers for Azure, AWS, Hashicorp Vault and Environment variables .
Single Message Transforms (SMTs)
SMTs are plugins that enable users to manipulate records, one at a time. For Source Connectors they manipulate records after the Task has handed them back to the Connect Framework and before they are written to Kafka. For Sink Connectors, they allow for manipulation of records before they are passed from the Connect Framework to the Sink Task.
Apache Kafka comes with several available SMTs, for which the documentation can be found .
Lenses.io also provides a number of SMTs, which can be found .
Overview
This page describes an overview of the Lenses SMTs for Kafka Connect.
Lenses provides several SMTs designed for use with Stream Reactor connectors, you can also use them with other connectors or your own.
These SMTs are designed to be used with the Kafka Connect framework. The SMTs create record headers. The advantage of using headers is that they reduce the memory and CPU cycles required to change the payload. See for example the Kafka Connect TimestampConverter. Furthermore, they support Stream-Reactor S3 sink partitioners, for scenarios like:
Partitioning by the system clock (e.g. using the system clock as a partition key with a yyyy-MM-dd-HH format)
Partitioning by a rolling window (e.g. every 15 minutes, or one hour)
Partitioning by a custom timestamp (e.g. a timestamp field in the payload, record Key or Value)
Partitioning by a custom timestamp with a rolling window (e.g. a timestamp field in the payload, every 15 minutes, or one hour)
Installation
SMT installation on Lenses platform
Add the plugin to the worker classloader isolation via the plugin.path option:
SMT installation on MSK connect
For MSK connect you need to bundle your SMT with the connector you need to use and deploy as one plugin.
To do so, download your connector to get the jar (unzip if needed), download (unzip it). Put both jar in the same folder, and zip them altogether.
This zip (containing both jars at the same level) must be uploaded as a plugin in MSK connect.
Downloads
GitHub Release downloads for Stream Reactor Connectors, Secret Providers and SMTs.
Contributing
This page describes how to contribute to the Stream Reactor.
The Stream Reactor is an open-source project and we welcome feedback on any issues, even more, we enjoy and encourage contributions, either fixes or new connectors! Don't be shy.
What you can contribute? Everything, even if you aren't a JVM program you can help with docs and tutorials, everything is open.
How to contribute?
If you have an issue raise it on the GitHub issue page and supply as much information as you can. If you have a fix, raise a Pull request and ask for a review.
Please make sure to:
Test your changes, we won't accept changes without unit and integration testing
Make sure it builds!
Rebase your issues if they become stale
The Stream Reactor is under an Apache 2.0 license so others can use your hard work for free without restriction.
Secret Providers
This page describes how to install secret provider plugins in your Kafka Connect Clusters.
Sources
This page details the configuration options for the Stream Reactor Kafka Connect source connectors.
Source connectors read data from external systems and write to Kafka.
Using Error Policies
This page describes how to use Error policies in Stream Reactor sink connectors.
In addition to the dead letter queues provided by Kafka Connect, the Stream Reactor sink connectors support Error Policies to handle failure scenarios.
The sinks have three error policies that determine how failed writes to the target database are handled. These error policies allow you to control the behaviour of the sink if it encounters an error when writing records to the target system. Since Kafka retains the records, subject to the configured retention policy of the topic, the sink can ignore the error, fail the connector or attempt redelivery.
Name
Description
Default Value
Source converters with incoming JSON or Avro
This page describes how to use converters with source systems sending JSON and Avro.
Source converters depend on the source system you are reading data from. The Connect SourceTask class requires you to supply a List of SourceRecords. Those records can have a schema but how the schema is translated from the source system to a Connect Struct depends on the connector.
We provide four converters out of the box but you can plug in your own. The WITHCONVERTER keyword supports this option. These can be used when source systems send records as JSON or AVRO, for example, MQTT or JMS.
Http Sink Templating
The page provides examples of HTTP Sink templating.
Static string template
In this case the converters are irrelevant as we are not using the message content to populate our message template.
Update docs!
Install
Learn how to install the Secret Providers.
AWS Secret Manager
Secure Kafka Connect secrets with AWS Secret Manager.
Azure KeyVault
Secure Kafka Connect secrets with Azure KeyVault.
Environment Variables
Secure Kafka Connect secrets with Environment Variable.
Hashicorp Vault
Secure Kafka Connect secrets with Hashicorp Vault.
AES256
Secure Kafka Connect secrets with AES256 encryption.
AWS S3
Load data from AWS S3 including restoring topics.
Azure Event Hubs
Load data from Azure Event Hubs into Kafka topics.
Azure Service Bus
Load data from Azure Service Bus into Kafka topics.
Cassandra
Load data from Cassandra into Kafka topics.
GCP PubSub
Load data from GCP PubSub into Kafka topics.
GCP Storage
Load data from GCP Storage including restoring topics.
FTP
Load data from files on FTP servers into Kafka topics.
JMS
Load data from JMS topics and queues into Kafka topics.
MQTT
Load data from MQTT into Kafka topics.
Stream Reactor
Secret Providers
Single Message Transforms
JSON Schema Tool
THROW
[connector-prefix].max.retries
The maximum number of times a message is retried. Only valid when the [connector-prefix].error.policy is set to RETRY
10
[connector-prefix].retry.interval
The interval, in milliseconds between retries, if the sink is using [connector-prefix].error.policy set to RETRY
60000
Throw
Any error in writing to the target system will be propagated up and processing is stopped. This is the default behaviour.
Noop
Any error in writing to the target database is ignored and processing continues.
Keep in mind
This can lead to missed errors if you don’t have adequate monitoring. Data is not lost as it’s still in Kafka subject to Kafka’s retention policy. The sink currently does not distinguish between integrity constraint violations and or other exceptions thrown by any drivers or target systems.
Retry
Any error in writing to the target system causes the RetryIterable exception to be thrown. This causes the Kafka Connect framework to pause and replay the message. Offsets are not committed. For example, if the table is offline it will cause a write failure, and the message can be replayed. With the Retry policy, the issue can be fixed without stopping the sink.
[connector-prefix].error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options, NOOP, the error is swallowed, THROW, the error is allowed to propagate and retry. For RETRY the Kafka message is redelivered up to a maximum number of times specified by the [connector-prefix].max.retries option
Not all Connectors support the source converters. Check the option reference for your connector.
Before records are passed back to connect, they go through the converter if specified.
AvroConverter
The payload is an Avro message. In this case, you need to provide a path for the Avro schema file to be able to decode it.
JsonPassThroughConverter
The incoming payload is JSON, the resulting Kafka message value will be of type string and the contents will be the incoming JSON.
JsonSimpleConverter
The payload is a JSON message. This converter will parse the JSON and create an Avro record for it which will be sent to Kafka.
JsonConverterWithSchemaEvolution
An experimental converter for translating JSON messages to Avro. The Avro schema is fully compatible as new fields are added as the JSON payload evolves.
BytesConverter
Dynamic string template
The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.
org.apache.kafka.connect.converters.IntegerConverter: Serializes to and deserializes from INTEGER values. When converting from bytes to Connect format, the converter returns an optional INT32 schema.
org.apache.kafka.connect.converters.LongConverter: Serializes to and deserializes from LONG values. When converting from bytes to Connect format, the converter returns an optional INT64 schema.
org.apache.kafka.connect.converters.ShortConverter: Serializes to and deserializes from SHORT values. When converting from bytes to Connect format, the converter returns an optional INT16 schema.
This page details the configuration options for the Stream Reactor Kafka Connect sink connectors.
Sink connectors read data from Kafka and write to an external system.
FAQ
Can the datalakes sinks lose data?
Kafka topic retention policies determine how long a message is retained in a topic before it is deleted. If the retention period expires and the connector has not processed the messages, possibly due to not running or other issues, the unprocessed Kafka data will be deleted as per the retention policy. This can lead to significant data loss since the messages will no longer be available for the connector to sink to the target system.
Do the datalake sinks support exactly once semantics?
Yes, the datalakes connectors natively support exactly-once guarantees.
How do I escape dots in field names in KCQL?
Field names in Kafka message headers or values may contain dots (.). To access these correctly, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):
How do I escape other special characters in field names in KCQL?
For field names with spaces or special characters, use a similar escaping strategy:
Field name with a space: `_value.'full name'`
Field name with special characters: `_value.'$special_characters!'`
This ensures the connector correctly extracts the intended fields and avoids parsing errors.
InsertWallclockDateTimePart
A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a message header, with a value of type STRING.
Use InsertWallclockHeaders SMT if you want to use more than one date time part. This avoids multiple SMTs and is more efficient.
For example, if you want to partition the data by yyyy-MM-dd/HH, then you can use InsertWallclockHeaderswhich inserts multiple headers: date, year, month, day, hour, minute, second.
Transform Type Class
Configuration
Name
Description
Type
Default
Valid Values
Importance
Example
To store the year, use the following configuration:
To store the month, use the following configuration:
To store the day, use the following configuration:
To store the hour, use the following configuration:
To store the hour, and apply a timezone, use the following configuration:
To store the minute, use the following configuration:
To store the second, use the following configuration:
AES256
Decodes values encoded with AES-256 to enable passing encrypted values to connectors.
Secrets will only be reloaded if the Connector restarts.
Add the plugin to the worker classloader isolation via the plugin.path option:
The provider gets AES-256 encrypted value as a key and simply decrypts it to get the value (instead of e.g. looking up for the value somewhere).
The AES-256 encryption used for the value needs to be prefixed with base64 encoded initialisation vector and a space character, the encrypted value is also base64 encoded. So to corretly encrypt value1 I need to follow following steps:
encrypted-bytes = aes-256 encrypted value1
encrypted-base64 = base64 encrypted-bytes
Configuring the plugin
The plugin needs to be configured with secret key that will be used for decoding. The key is a string and needs to have size of 32 bytes (UTF-8 encoded).
Name
Description
Default
Example worker properties file:
Usage
To use this provider in a connector, reference the keyvault containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path used to provide encoding of the value: utf8, utf8_file, base64, base64_file
key is the AES-256 encrypted value to be decrypted by the plugin
For example, if hello aes-256 encrypted using some key equals to xyxyxy - then if I configure connector to use ${aes256::xyxyxy} for a parameter value, the value should be substituted with “hello” string:
This would resolve at runtime to:
path belonging to key reference is used to specify encoding used to pass the value. The provider supports following encodings:
base64: base-64 encoding of the textual value
base64_file: base-64 encoding of the value that when decrypted should be stored in the file
utf8_file: utf-8 encoding of the value that when decrypted should be stored in the file
The UTF8 means the value returned is the decrypted value of the encrypted value (key). The BASE64 means the value returned is the base64 decoded decrypted value of the encrypted value (key).
If the value for the encoding is UTF8_FILE the string contents are written to a file. The name of the file will be randomply generated. The file location is determined by the file.dir configuration option given to the provider via the Connect worker.properties file.
If the value for the encoding is BASE64_FILE the string contents are based64 decoded and written to a file. The name of the file will be randomply generated. For example, if a connector needs a PEM file on disk, set this as the path as BASE64_FILE. The file location is determined by the file.dir configuration option given to the provider via the Connect worker.properties file.
If the key reference path is not set or is set to unknown value - utf8 encoding is used as default.
For example, if we want to save hi there ! to the file, and aes-256 encrypted content equals xyxyxy - then if I configure connector to use ${aes256:utf8_file:xyxyxy} for a parameter value, the provider will create new file with random name (abc-def-ghi) and store hi there ! to the file. If configured store directory is /store-root, he value will be substituted with /store-root/secrets/abc-def-ghi string:
resolves to
Environment Provider
This page describes how to retrieve secrets from Environment variables for use in Kafka Connect.
Use Environment variables to hold secrets and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
Configuration
Example Worker Properties:
Usage
To use this provider in a connector, reference the ENVSecretProvider environment variable providing the value of the connector property.
The indirect reference is in the form ${provider::key} where:
provider is the name of the provider in the worker property file set above
key is the name of the environment variable holding the secret.
For example, if we store two secrets as environment variables:
MY_ENV_VAR_USERNAME with the value lenses and
MY_ENV_VAR_PASSWORD with the value my-secret-password
we would set:
This would resolve at runtime to:
Data encoding
This provider inspects the value of the environment to determine how to process the value. The value can optionally provide value metadata to support base64 decoding and writing values to files.
To provide metadata the following patterns are expected:
where value is the actual payload and metadata can be one of the following:
ENV-base64 - the provider will attempt to base64 decode the value string
ENV-mounted-base64 - the provider will attempt to base64 decode the value string and write to a file
ENV-mounted - the provider will write the value to a file
if no metadata is found the value of the environment variable is returned.
Secret Providers
This page contains the release notes for Connect Secret Providers.
2.3.0
Security: Write maven Descriptors on packaging to avoid incorrect dependencies being identified by security scanner tools. (Fixes CVE-2023-1370).
Security: Add dependency checking as part of the build process.
AES256 Provider:
Security: Change AES256 key to PASSWORD type to avoid logging secrets.
AWS Secrets Manager Provider:
New property : file.write
Writes secrets to file on path. Required for Java trust stores, key stores, certs that need to be loaded from file. For ease of use for the secret provider, this is disabled by default.
New property : secret.default.ttl
If no TTL is configured in AWS Secrets Manager, apply a default TTL (in milliseconds).
Azure Secret Provider:
Bugfix: Recompute TTL values on each get so the timestamp of reschedule shrinks until TTL is reached.
Bugfix: Fix so that UTF-8 encodings in Azure are correctly mapped to the UTF8 encoding in the secret provider.
Hashicorp Vault Provider:
Bugfix: Files will be written to the correct directory.
New property: app.role.path
Support vault approle custom mount path.
New property: kubernetes.auth.path
Support vault custom auth path (with default value to be auth/kubernetes).
Sink converters & different data formats
This page describes configuring sink converters for Kafka Connect.
You can configure the converters either at the Connect worker level or at the Connector instance level.
Schema.Struct and a Struct Payload
If you follow the best practice while producing the events, each message should carry its schema information. The best option is to send AVRO.
This requires the SchemaRegistry.
Schema.String and a JSON Payload
Sometimes the producer would find it easier to just send a message with Schema.String and a JSON string. In this case, your connector configuration should be set to value.converter=org.apache.kafka.connect.json.JsonConverter. This doesn’t require the SchemaRegistry.
No schema and a JSON Payload
Many existing systems publish JSON over Kafka and bringing them in line with best practices is quite a challenge, hence we added the support. To enable this support you must change the converters in the connector configuration.
InsertRecordTimestampHeaders
SMT that inserts date, year, month, day, hour, minute and second headers using the record timestamp. If the record timestamp is null, the SMT uses the current system time.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH or yyyy/MM/dd/HH, for example, and only use one SMT.
The list of headers inserted are:
date
InsertRollingWallclock
SMT that inserts the system clock value as a message header, a value adapted to a specified time window boundary, for example every 15 minutes, or one hour.
The value inserted is stored as a STRING, and it holds either a string representation of the date and time epoch value, or a string representation of the date and time in the format specified.
Transform Type Class
InsertSourcePartitionOrOffsetValue
The InsertSourcePartitionOrOffsetValue transformation in Kafka Connect allows you to insert headers into SourceRecords based on partition or offset values.
The InsertSourcePartitionOrOffsetValue transformation in Kafka Connect allows you to insert headers into SourceRecords based on partition or offset values. This is useful for adding metadata to your data records before they are sent to destinations like AWS S3, Azure Datalake, or GCP Storage.
This SMT only works with source connectors.
Creating & managing a connector
This page describes managing a basic connector instance in your Connect cluster.
Creating your Kafka Connector Configuration
To deploy a connector into the Kafka Connect Cluster, you must follow the steps below:
You need to have the Jars in your Kafka Connect Cluster
Developing a connector
This section describes how to contribute a new connector to the Stream Reactor.
SBT, Java 11, and Scala 2.13 are required.
Setting up a new module
InsertWallclock
Inserts the system clock as a message header.
Use SMT if you want to use more than one date time part. This avoids multiple SMTs and is more efficient.
For example, if you want to partition the data by yyyy-MM-dd/HH, then you can use InsertWallclockHeaderswhich inserts multiple headers: date, year, month, day, hour, minute, second.
The Stream Reactor is built using SBT. Each connector is defined in a submodule in the root project.
Add the new directory called kafka-connect-[your-connector].
Under this add the standard path /src/main/
Package name
Use io.lenses.streamreactor.connector as the parent package. The following convention is used but each connector is different and can have more sub packages:
config - configuration and settings
sink - sink connectors, tasks and writers
source - source connectors, task and readers
Dependencies
Dependencies are declared in project/Dependencies.scala. Add the dependencies for you connector as a new field in the version object and the maven coordinates, for example:
Next, in the Dependenciestrait add a sequence to hold you dependencies:
Next, declare the submodule in Build.sbt.
Add the project to the subproject list:
Defined the dependencies for you module. In this example kafkaConnectAzureServiceBusDeps holds the dependencies defined earlier.
INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
object version {
....
val azureServiceBusVersion = "7.14.7"
...
lazy val azureServiceBus: ModuleID = "com.azure" % "azure-messaging-servicebus" % azureServiceBusVersion
val kafkaConnectAzureServiceBusDeps: Seq[ModuleID] = Seq(azureServiceBus)
lazy val subProjects: Seq[Project] = Seq(
`query-language`,
common,
`cloud-common`,
`aws-s3`,
`azure-documentdb`,
`azure-datalake`,
`azure-servicebus`,
`azure-storage`,
Cassandra,
elastic6,
elastic7,
ftp,
`gcp-storage`,
influxdb,
jms,
mongodb,
mqtt,
redis,
)
-----
lazy val `azure-servicebus` = (project in file("kafka-connect-azure-servicebus"))
.dependsOn(common)
.settings(
settings ++
Seq(
name := "kafka-connect-azure-servicebus",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectAzureServiceBusDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.enablePlugins(PackPlugin)
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _, here are a few KCQL examples:
Transform Type Class
Configuration
Name
Description
Type
Default
Importance
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
Example
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
Configuration
Name
Description
Type
Default
Valid Values
Importance
header.name
The name of the header to insert the timestamp into.
String
High
value.type
Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required."
String
Example
To store the epoch value, use the following configuration:
To store a string representation of the date and time in the format yyyy-MM-dd HH:mm:ss.SSS, use the following:
To use the timezone Asia/Kolkoata, use the following:
Configuration Properties
Configuration Property
Description
Optionality
Default Value
offset.fields
Comma-separated list of fields to retrieve from the offset
Optional
Empty list
offset.prefix
Optional prefix for offset keys
Optional
"offset."
partition.fields
Comma-separated list of fields to retrieve from the partition
Required
Default Value: Specifies the default value assigned if no value is explicitly provided in the configuration.
These properties allow you to customize which fields from the offset and partition of a SourceRecord are added as headers, along with specifying optional prefixes for the header keys. Adjust these configurations based on your specific use case and data requirements.
Example Configuration
transforms: This property lists the transformations to be applied to the records.
transforms.InsertSourcePartitionOrOffsetValue.type: Specifies the class implementing the transformation (InsertSourcePartitionOrOffsetValue in this case).
transforms.InsertSourcePartitionOrOffsetValue.offset.fields: Defines the fields from the offset to be inserted as headers in the SourceRecord. Replace path,line,ts with the actual field names you want to extract from the offset.
transforms.InsertSourcePartitionOrOffsetValue.partition.fields: Defines the fields from the partition to be inserted as headers in the SourceRecord. Replace container,prefix with the actual field names you want to extract from the partition.
Example Usage with Cloud Connectors
AWS S3, Azure Datalake or GCP Storage
When using this transformation with AWS S3, you can configure your Kafka Connect connector as follows:
To customise the header prefix you can also set the header values:
Replace path,line,ts and container,prefix with the actual field names you are interested in extracting from the partition or offset.
By using InsertSourcePartitionOrOffsetValue transformation, you can enrich your data records with additional metadata headers based on partition or offset values before they are delivered to your cloud storage destinations.
Using the Prefix Feature in InsertSourcePartitionOrOffsetValue Transformation
The prefix feature in InsertSourcePartitionOrOffsetValue allows you to prepend a consistent identifier to each header key added based on partition or offset values from SourceRecords.
Configure the transformation in your Kafka Connect connector properties:
offset.prefix: Specifies the prefix for headers derived from offset values. Default is "offset.".
partition.prefix: Specifies the prefix for headers derived from partition values. Default is "partition.".
By setting offset.prefix=offset. and partition.prefix=partition., headers added based on offset and partition fields will have keys prefixed accordingly in the SourceRecord headers.
This configuration ensures clarity and organization when inserting metadata headers into your Kafka records, distinguishing them based on their source (offset or partition). Adjust prefixes (offset.prefix and partition.prefix) as per your naming conventions or requirements.
plugin.path
Each connector has mandatory configurations that must be deployed and validated; other configurations are optional. Always read the connector documentation first.
You can deploy the connector using Kafka Connect API or Lenses to manage it for you.
Sample of Connector, AWS S3 Sink Connector from Stream Rector:
Simple flow of our connector reading from Kafka and writting into S3
Let's drill down this connector configuration and what will happen when I deploy:
connector.class is the plugin we will use.
task.max is how many tasks will be executed in the Kafka Connect Cluster. In this example, we will have 1 task; my topic has 9 partitions, so in this case, in the consumer group of this connector, we will have 1 instance. This one task will consume 9 partitions. To scale is just to increase the number of tasks.
name of the connector on the Kafka Connect Cluster must be a unique name for each Kafka Connect Cluster.
topics the topic name will be consumed, and all data will be written in AWS S3, as our example describes.
value.converter is the format type used to deserialize or serialize the data in value. In this case, our value will be in json format.
key.converter is the format type used to deserialize or serialize the data in key. In this case, our key will be in string format.
key.converter.schemas.enable is a field where we tell Kafka Connect if we want to include the value schema in the message. in our example, false we don't want to include the value schema.
value.converter.schemas.enable is a field where we tell Kafka Connect if we want to include the key schema in the message. in our example, false we don't want to include the key schema.
connect.s3.aws.auth.mode is what is the type of authentication we will use to connect to the AWS S3 bucket.
connect.s3.aws.access.key is the access key to authenticate into the AWS S3 bucket.
connect.s3.aws.secret.key is the secret key to authenticate into the AWS S3 bucket.
connect.s3.aws.region which region the AWS S3 bucket is deployed.
connect.s3.kcql We use the Kafka Connect Query for configuration, bucket name, folder, which format will be stored, and frequency of adding new files into the bucket.
For deep configuration of AWS S3 Sink connect, click here
Managing your Connector
After deploying your Kafka Connector into your Kafka Connect Cluster, it will be managed by the Kafka Connect Cluster.
To better show how Kafka Connect manages your connectors, we will use Lenses UI.
The image below is the Lenses Connectors list:
In the following image, we will delve into the Kafka Connector details.
Consumer Group, when we use Lenses, we can see which consumer group is reading and consuming that topic.
Connector tasks, we can see which tasks are open, the status, and how many records are in and out of that topic.
Inserts the system clock as a message header, with a value of type STRING. The value can be either a string representation of the date and time epoch value, or a string representation of the date and time in the format specified for example yyyy-MM-dd HH:mm:ss.SSS.
Transform Type Class
Configuration
Name
Description
Type
Default
Valid Values
Importance
header.name
The name of the header to insert the timestamp into.
String
High
value.type
Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required."
String
Example
To store the epoch value, use the following configuration:
To store a string representation of the date and time in the format yyyy-MM-dd HH:mm:ss.SSS, use the following:
To use the timezone Asia/Kolkoata, use the following:
Examples for GCP Sink Kafka Connector time based partitioning.
Partitioning by Date and Time
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Partitioning by Data Date and Hour
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
Default Confluent Partitioning
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>. This provides a default partitioning mechanism for Kafka topics.
Partitioning by Year, Month, and Day
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Partitioning by Year, Month, Day, Hour, and Minute
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
Partitioning by Year, Month, Day, and Hour
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
Partitioning by Date and Hour
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
Partitioning by Created At Timestamp
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Partitioning by Raw Creation Date
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Partitioning by Creation Timestamp
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
Partitioning by Created At Date
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Partitioning by Created At Date (Alternate Format)
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Partitioning by Creation Date
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
Partitioning by Data Date
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Partitioning by Date and Hour
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
Azure Event Hubs
This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.
A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.
In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info here.
Connector Class
Example
For more examples see the .
Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):
KCQL support
You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.
The following KCQL is supported:
The selection of fields from the Event Hubs message is not supported.
Payload support
As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.
Authentication
You can connect to Azure EventHubs passing specific JAAS parameters in configuration.
Learn more about different methods of connecting to Event Hubs on . The only caveat is to add connector-specific prefix like in example above. See for more info.
Fine-tunning the Kafka Connector
The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings and when user specifies a property with it, it will be automatically passed to the Consumer.
User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.
It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records is passed directly to it)
There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:
A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using the system clock as a message header.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH or yyyy/MM/dd/HH, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _, here are a few KCQL examples:
Transform Type Class
Configuration
Name
Description
Type
Default
Importance
Example
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
IDE Support (JSON Schemas)
Add support for creating connector configuration in your IDE.
The Connect JSON Schema toot is a tool that generates JSON Schema from Kafka Connect connector, Single Message Transforms and Secret Provider configurations. It helps validate and document connector configurations by providing a standardized schema format that can be used for validation and documentation purposes.
Many IDE's support JSON schemas allowing for intellisense and auto completion. For VS code you can find out more information here.
Generates JSON Schema from any Kafka Connect Connector, SMT or Secret Provider JAR
Includes all connector configuration fields with types and descriptions
Includes default Kafka Connect fields.
Beta - The Schemas are still beta and can change and are currently for the latest version of the Stream Reactor connector.
The Generator Tool will generate JSON schemas from jars containing classes implementing:
Sink Connector
Source Connector
Single Message Transforms
Secret Providers
In addition it can also merge the individual connectors schemas into a Union schema.
Optionally it can then include overrides for Connect converters (header, key & value) and consumer and producers.
Usage
To triggered the default snippets at the root level start typing:
Connectors: for connector examples
Overrides: for converters, consumer & producer
You can type further to get default snippets for different connector types, e.g. AWS S3.
VS Code Setup
Once you have JSON schema you can enable JSON Schema validation and auto completion in VS Code for both JSON and YAML files:
1
Install the required VS Code extensions
"YAML" by Red Hat
"JSON Language Features" (built-in)
2
This setup will provide:
Schema validation for both JSON and YAML files
Auto completion for connector configurations
Hover documentation for fields
Lenses Connectors Support
This page describes the enterprise support available for the Stream Reactor Kafka Connect connectors.
Lenses offers support for our connectors as a paid subscription, The list of connectors is always growing and available here. Please reach out to sales for for any inquiries.
SLA availability varies depending on connectors. The most recent connectors are considered "Next-Gen" and are available with a premium plus SLA to accommodate the most demanding requirements.
Support Level Available
Connector Name
Professional Support (Working hours)
Enterprise Support (24/7)
Premium Plus (15min SLA) for Next-Gen Connectors
Supported version and operability
please refer to
Stream Reactor version (Connector)
Support Status
Kafka Connect Version
Kafka version
GCP Source
Examples for GCP Source Kafka Connector.
Parquet Configuration, Using Avro Value Converter
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: gcp-storageSourceConnectorParquet (This can be customized as needed)
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME: Name of the GCP Storage storage bucket.
$PREFIX_NAME: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
Envelope Storage
This configuration example is particularly useful when you need to restore data from a GCP Storage, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
When to Use This Configuration:
Data Restoration with Envelope Structure: If you’re restoring data from GCP Storage into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000) and a limit on the number of records (LIMIT 10000). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
AVRO Configuration Envelope Storage
Similar to the above, this is another configuration for envelope format
When to Use This Configuration Variant:
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4), this variant is configured for single-task processing (tasks.max=1). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Single Message Transforms
This page contains the release notes for Single Message Transforms.
1.3.2
Adds support for multiple "from" patterns.
This converts the format.from.pattern field in the following SMTs:
InsertFieldTimestampHeaders
InsertRollingFieldTimestampHeaders
TimestampConverter
into a List (comma separated) so that these SMTs can support multiple (fallback) DateTimeFormatter patterns should multiple timestamps be in use.
Configuration Compatibility
When updating your configuration, if format.from.pattern contains commas, enclose the pattern in double quotes.
Configurations should be backwards-compatible with previous versions of the SMT, the exception is if commas are used in the format.from.pattern string.
To update the configuration of format.from.pattern ensure you enclose any pattern which contains commas in double quotes.
Old Configuration:
New Configuration
Multiple format.from.pattern can now be configured, each pattern containing a comma can be enclosed in double quotes:
Configuration Order
When configuring format.from.pattern, the order matters; less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd after yyyy-MM-dd'T'HH:mm:ss to ensure detailed timestamp information isn't truncated.
1.3.1
Increase error information for debugging.
1.3.0
Adds support for adding metadata to kafka connect headers (for a source connector).
1.2.1
Workaround for Connect runtime failing with unexplained exception where it looks like the static fields of parent class is not resolved prop.
1.2.0
Adds support for inserting time based headers using a Kafka message payload field.
1.1.2
Fix public visibility of rolling timestamp headers.
1.1.1
Don't make CTOR protected.
1.1.0
Introducing four new Single Message Transforms (SMTs) aimed at simplifying and streamlining the management of system or record timestamps, along with support for rolling windows. These SMTs are designed to significantly reduce the complexity associated with partitioning data in S3/Azure/GCS Sink based on time, offering a more efficient and intuitive approach to data organization. By leveraging these SMTs, users can seamlessly handle timestamp-based partitioning tasks, including optional rolling window functionality, paving the way for smoother data management workflows.
InsertRollingRecordTimestampHeaders
Inserts date, year, month, day, hour, minute and second headers using the record timestamp and a rolling time window configuration. If the record timestamp is null, the SMT uses the system time.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH or yyyy/MM/dd/HH, for example, and only use one SMT.
The list of headers inserted are:
date
Azure KeyVault
This page describes how to retrieve secrets from Azure KeyVault for use in Kafka Connect.
Secure secrets in Azure KeyVault and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
InfluxDB
This page describes the usage of the Stream Reactor InfluxDB Sink Connector.
This connector has been retired starting version 11.0.0
AWS S3 Source Examples
Examples for AWS S3 Source Kafka Connector.
Parquet Configuration, Using Avro Value Converter
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: aws-s3SourceConnectorParquet (This can be customized as needed)
AWS S3 Sink Time Based Partitioning
Examples for AWS S3 Sink Kafka Connector time based partitioning.
Partitioning by Date and Time
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hour
name=gcp-storageSourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=1
connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
year.format
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
format
epoch,format
High
format
Sets the format of the header value inserted if the type was set to string. It can be any valid java date format.
String
High
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
High
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
High
timezone
Sets the timezone. It can be any valid java timezone. Overwrite it when value.type is set to format, otherwise it will raise an exception.
String
UTC
High
Empty list
partition.prefix
Optional prefix for partition keys
Optional
"partition."
format
epoch,format
High
format
Sets the format of the header value inserted if the type was set to string. It can be any valid java date format.
String
High
timezone
Sets the timezone. It can be any valid java timezone. Overwrite it when value.type is set to format, otherwise it will raise an exception.
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
year.format
Error Handling: Error handling is configured to throw exceptions (connect.gcpstorage.error.policy=THROW) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.gcpstorage.source.partition.extractor.type=hierarchical), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.gcpstorage.source.partition.extractor.type=hierarchical), enabling extraction of partitioning structures from the source data location.
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME: Name of the AWS S3 storage bucket.
$PREFIX_NAME: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
Envelope Storage
This configuration example is particularly useful when you need to restore data from a AWS S3, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
When to Use This Configuration:
Data Restoration with Envelope Structure: If you’re restoring data from AWS S3 into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000) and a limit on the number of records (LIMIT 10000). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
Error Handling: Error handling is configured to throw exceptions (connect.s3.error.policy=THROW) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.s3.source.partition.extractor.type=hierarchical), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
AVRO Configuration Envelope Storage
Similar to the above, this is another configuration for envelope format
When to Use This Configuration Variant:
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4), this variant is configured for single-task processing (tasks.max=1). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.s3.source.partition.extractor.type=hierarchical), enabling extraction of partitioning structures from the source data location.
Partitioning by Data Date and Hour
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
Default Confluent Partitioning
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>. This provides a default partitioning mechanism for Kafka topics.
Partitioning by Year, Month, and Day
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Partitioning by Year, Month, Day, Hour, and Minute
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
Partitioning by Year, Month, Day, and Hour
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
Partitioning by Date and Hour
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
Partitioning by Created At Timestamp
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Partitioning by Raw Creation Date
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Partitioning by Creation Timestamp
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
Partitioning by Created At Date
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Partitioning by Created At Date (Alternate Format)
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Partitioning by Creation Date
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
Partitioning by Data Date
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Partitioning by Date and Hour
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="yyyy"
transforms.partition.month.format="MM"
transforms.partition.day.format="dd"
transforms.partition.hour.format="HH"
transforms.partition.minute.format="mm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minute
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date='yyyy-MM-dd"
transforms.partition.hour.format="'time='HHmm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.time
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'dt='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation-ts='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyyMMddHH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'created_at='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation_ds='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
name=aws-s3SourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089
connect.s3.aws.region=eu-west-2
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=Credentials
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
transforms.partition.hour.format="'hour='HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="yyyy"
transforms.partition.month.format="MM"
transforms.partition.day.format="dd"
transforms.partition.hour.format="HH"
transforms.partition.minute.format="mm"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minute
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date='yyyy-MM-dd"
transforms.partition.hour.format="'time='HHmm"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.time
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'dt='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation-ts='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyyMMddHH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'created_at='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation_ds='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
Configure VS Code settings:
Open VS Code settings (Ctrl+,)
Add the following configuration to your settings.json:
Replace /path/to/your/schema.json with the actual path to your schema file.
3
For project-specific settings, create a .vscode/settings.json file in your project root:
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _, here are a few KCQL examples:
Transform Type Class
Configuration
Name
Description
Type
Default
Importance
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
Example
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
Authentication
Two authentication methods are supported:
credentials. When using this configuration the client-id, tenant-id and secret-id for an Azure service principal that has access to key vaults must be provided
default. This method uses the default credential provider chain from Azure. The default credential first checks environment variables for configuration. If the environment configuration is incomplete, it will try to use managed identities.
Configuring the plugin
Name
Description
Default
azure.auth.method
Azure authenticate method. ‘credentials’ to use the provided
credentials or ‘default’ for the standard Azure provider chain
credentials
azure.client.id
Azure client id for the service principal. Valid is auth.method is ‘credentials’
azure.tenant.id
Azure tenant id for the service principal. Valid is auth.method is ‘credentials’
Example worker properties file:
Usage
To use this provider in a connector, reference the keyvault containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path is the URL of the Azure KeyVault. DO NOT provide the https:// protocol for the in the keyvault name as the Connect worker will not parse it correctly
key is the name of the secret key in the Azure KeyVault
For example, if we store two secrets:
my_username with the value lenses and
my_password with the value my-secret-password
in a Keyvault called my-azure-key-vault we would set:
This would resolve at runtime to:
Data encoding
The provider handles the following types:
utf_8
base64
The provider will look for a tag attached to the secret called file-encoding. The value for this tag can be:
UTF8
UTF_FILE
BASE64
BASE64_FILE
The UTF8 means the value returned is the string retrieved for the secret key. The BASE64 means the value returned is the base64 decoded string retrieved for the secret key.
If the value for the tag is UTF8_FILE the string contents as are written to a file. The returned value from the connector configuration key will be the location of the file. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties file.
If the value for the tag is BASE64_FILE the string contents are based64 decoded and are written to a file. The returned value from the connector configuration key will be the location of the file. For example, if a connector needs a PEM file on disk, set the prefix as BASE64_FILE. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties file.
If no tag is found the contents of the secret string are returned.
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Tags
InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.
Only applicable to value fields. No support for nested fields, keys or topic metadata.
This page describes how to retrieve secrets from AWS Secret Manager for use in Kafka Connect.
Secure secrets in AWS Secret Manager and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
Authentication
Two authentication methods are supported:
credentails. When using this configuration the access-key and secret-key are used.
default. This method uses the default credential provider chain from AWS. The default credential first checks environment variables for configuration. If the environment configuration is incomplete, Java props, then the profile file and finally it will try managed identity.
Configuring the plugin
Name
Description
Default
Example Worker Properties
Usage
To use this provider in a connector, reference the SecretManager containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path is the name of the secret
key is the name of the secret key in secret to retrieve. AWS can store multiple keys under a path.
For example, if we store two secrets as keys:
my_username_key with the value lenses and
my_password_key with the value my-secret-password
in a secret called my-aws-secret we would set:
This would resolve at runtime to:
Data encoding
AWS SecretManager BinaryString (API only), is not supported. The secrets must be stored under the secret name in key, value pair format. The provider checks the SecretString API and expects a JSON string to be returned.
For example for an RDS Postgre secret, the following is returned by AWS Secret Manager:
The provider handles the following types:
utf_8
base64
The provider will look for keys prefixed with:
UTF8
UTF_FILE
BASE64
The UTF8 means the value returned is the string retrieved for the secret key. The BASE64 means the value returned is the base64 decoded string retrieved for the secret key.
If the value for the tag is UTF8_FILE the string contents are written to a file. The returned value from the connector configuration key will be the location of the file. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties file.
If the value for the tag is BASE64_FILE the string contents are based64 decoded and written to a file. The returned value from the connector configuration key will be the location of the file. For example, if a connector needs a PEM file on disk, set the prefix as BASE64_FILE. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties file.
If no prefix is found the contents of the secret string are returned.
InsertRollingWallclockHeaders
Kafka SMT that inserts date, year, month, day, hour, minute and second headers using the system timestamp and a rolling time window configuration.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH or yyyy/MM/dd/HH, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _, here are a few KCQL examples:
Transform Type Class
Configuration
Name
Description
Type
Default
Importance
Example
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
Azure Service Bus
This page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.
Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Bus in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.
Connector Class
Full Config Example
For more examples see the .
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in . Feel free to tweak the configuration to your requirements.
KCQL support
You can specify multiple KCQL statements separated by ; to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Kafka topic of name <your-kafka-topic> to Service Bus of name <your-service-bus> using the PROPERTIES specified (please check for more info on necessary properties)
The selection of fields from the Service Bus message is not supported.
Authentication
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the .
QUEUE and TOPIC Mappings
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check to learn more about those mechanisms.
Writing to QUEUE ServiceBus
In order to be writing to the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
Writing to TOPIC ServiceBus
In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC here and we need to pass it.
This is sufficient to enable you to create the mapping with your topic.
Disabling batching
If the Connector is supposed to transfer big messages (size of one megabyte and more), Service Bus may not want to accept a batch of such payloads, failing the Connector Task. In order to remediate that you may want to use batch.enabled parameter, setting it to false. This will sacrifice the ability to send the messages in batch (possibly doing it slower) but should enable user to transfer them safely.
For most of the usages, we recommend omitting it (it's set to true by default).
Kafka payload support
This sink supports the following Kafka payloads:
String Schema Key and Binary payload (then MessageId in Service Bus is set with Kafka Key)
any other key (or keyless) and Binary payload (this causes Service Bus messages to not have specified MessageId)
No Schema and JSON
Null Payload Transfer
Azure Service Bus doesn't allow to send messages with null content (payload)
Null Payload (sometimes referred as Kafka Tombstone) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren't allowed to send messages with null payload and we have to drop them instead.
Please keep that in mind when using Service Bus and designing business logic around null payloads!
Option Reference
KCQL Properties
Please find below all the necessary KCQL properties:
Name
Description
Type
Default Value
Configuration parameters
Please find below all the relevant configuration parameters:
Name
Description
Type
Default Value
Backup & Restore
This page describes how to perform a backup and restore of data in your Kafka cluster.
The following external storage for backing up to and restoring from are:
AWS S3
Backup and restore are achieved using the standard connectors but enabling the Message Envelope to ensure the correct metadata is persisted.
Message Envelope
A Kafka message includes keys, values, headers, and metadata (topic, partition, offset, timestamp).
The connector wraps these messages in an "envelope", streamlining backup and restoration without relying on complex Kafka Connect transformations.
Here's how the envelope is structured:
In this format, all parts of the Kafka message are retained. This is beneficial for backup, restoration, and to run analytical queries.
The Source connector uses this format to rebuild the original Kafka message and send it to the specified topic.
Storage Formats
Anything can be stored in S3, and the connector does its best to support the major formats, offering support for:
AVRO
Parquet
JSON
This format is decoupled from the format in Kafka. The translation from Kafka to Connect happens via the key.converter and value.converter connector properties.
Object partitioning in S3
Partitioning is crucial for organizing data and improving query speeds. In S3, this is achieved using the Object Key. By default, the connector reflects the structure of the Kafka topic it is sending to. For instance, a three-partition topic would use this configuration:
This would result in:
The connector allows for customised partitioning, which has its perks:
Better performance in subsequent data queries due to organized partitions.
Easy data management through time intervals, like year or month.
Keeping sensitive data in distinct partitions for tighter access controls.
For instance, for a "sales" Kafka topic with transaction messages, the KCQL can partition data by transaction year, product type, and customer region.
The Kafka Connect S3 Sink Connector will create custom object keys in your S3 bucket that incorporate the customer ID, transaction year, product category, and customer region, resulting in a coarser partitioning strategy. For instance, an object key might look like this:
To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:
This will result in object keys like:
Organizing data into time-based intervals within custom object keys can be highly beneficial. To achieve time-based intervals with a custom object key naming, the connector supports a complementary Kafka Connect Single Message Transformer (SMT) plugin designed to streamline this process. You can find the transformer plugin and documentation .
Consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called `timestamp`. Here's the connector configuration to achieve this:
In this configuration:
The TimestampConverter SMT is used to convert the timestamp field in the Kafka message's value into a string value based on the specified format pattern (yyyy-MM-dd-HH). This allows us to format the timestamp to represent an hourly interval.
The InsertWallclock SMT incorporates the current wallclock time in the specified format (yyyy-MM-dd-HH).
The PARTITIONBY clause leverages the timestamp field and the wallclock header to craft the object key, providing precise control over data partitioning.
MQTT
This page describes the usage of the Stream Reactor MQTT Source Connector.
A Kafka Connect source connector to read events from MQTT and push them to Kafka.
Connector Class
JMS
This page describes the usage of the Stream Reactor JMS Sink Connector.
Connector Class
Example
InsertFieldTimestampHeaders
Inserts the datetime as a message header from a value field.
This Kafka Connect Single Message Transform (SMT) facilitates the insertion of date and time components (year, month, day, hour, minute, second) as headers into Kafka messages using a timestamp field within the message payload. The timestamp field can be in various valid formats, including long integers, strings, or date objects. The timestamp field can originate from either the record Key or the record Value. When extracting from the record Key, prefix the field with _key.; otherwise, extract from the record Value by default or explicitly using the field without prefixing. For string-formatted fields, specify a format.from.pattern parameter to define the parsing pattern. Long integer fields are assumed to be Unix timestamps; the desired Unix precision can be specified using the unix.precision parameter.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH or yyyy/MM/dd/HH, for example, and only use one SMT.
TimestampConverter
SMT that allows the user to specify the format of the timestamp inserted as a header. It also avoids the synchronization block requirement for converting to a string representation of the timestamp.
An adapted version of the SMT. The SMT adds a few more features to the original:
allows nested fields resolution (e.g. a.b.c)
uses _key or _value as prefix to understand the field to convert is part of the record Key or Value
FTP
This page describes the usage of the Stream Reactor FTP Source Connector.
Provide the remote directories and on specified intervals, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting them into Kafka.
name=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
azure.secret.id
Azure secret id for the service principal. Valid is auth.method is ‘credentials’
file.dir
The base location for any files to stored
connect.influx.password
The password for the influxdb user.
password
connect.influx.kcql
KCQL expression describing field selection and target measurements.
string
connect.progress.enabled
Enables the output for how many records have been processed by the connector
boolean
false
connect.influx.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.influx.retry.interval
The time in milliseconds between retries.
int
60000
connect.influx.max.retries
The maximum number of times to try the write again.
int
20
connect.influx.retention.policy
Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen from 1.0 onwards or default for any previous version
string
autogen
connect.influx.consistency.level
Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.
string
ALL
AWS region the for the Secrets manager
file.dir
The base location for any files to stored
file.write
Writes secrets to file on path. Required for Java trust stores, key stores, certs that need to be loaded from file. For ease of use for the secret provider, this is disabled by default.
false
secret.default.ttl
If no TTL is configured in AWS Secrets Manager, apply a default TTL (in milliseconds).
(not enabled)
aws.endpoint.override
Specify the secret provider endpoint.
(not enabled)
secret.type
Specify the type of secrets stored in Secret Manager. Defaults to JSON, to enable String secret values set this property as STRING.
JSON
BASE64_FILE
aws.auth.method
AWS authenticate method. ‘credentials’ to use the
provided credentials or ‘default’ for the standard AWS provider chain
credentials
aws.access.key
AWS client key. Valid is auth.method is ‘credentials’
aws.secret.key
AWS secret key. Valid is auth.method is ‘credentials’
aws.region
wallclock_hour
wallclock_minute
wallclock_second
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
year.format
connect.servicebus.sink.retries.timeout
Timeout (in milliseconds) between retries if message has failed to be delivered to Service Bus
int
500
servicebus.type
Specifies Service Bus type: QUEUE or TOPIC
string
batch.enabled
Specifies if the Connector can send messages in batch, see Azure Service Bus
boolean
true
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.sink.retries.max
Number of retries if message has failed to be delivered to Service Bus
connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');
connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');
CSV (including headers)
Text
BYTES
To adjust partitioning, use the PARTITIONBY clause in the KCQL configuration. This can use the Kafka message's key, value, or headers for partitioning.
You can specify multiple KCQL statements separated by ;
However, you can not route the same source to different topics, for this use a separate connector instance.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
Keyed JSON format
To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data, the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation of the values of the fields. Optionally, the delimiter can be set via the KEYDELIMITER keyword.
Shared and Wildcard Subscriptions
The connector supports wildcard and shared subscriptions, but the KCQL command must be placed inside single quotes.
Dynamic target topics
When using wildcard subscriptions, you can dynamically route messages to a Kafka topic with the same name as the MQTT topic by using `$`in the KCQL target statement.
You must use a wildcard or a shared subscription format.
Kafka does not support / . The result topic names will have/replaced by _. For example:
/mqttSourceTopic/A/test would become mqttSourceTopic_A_test.
Message converters
The connector supports converters to handle different message payload formats in the source topic. See source record converters.
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
JMS Topics and Queues
The sink can write to either topics or queues, specified by the WITHTYPE clause.
JMS Payload
When a message is sent to a JMS target it can be one of the following:
Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory
string
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _, here are a few KCQL examples:
Transform Type Class
Configuration
Name
Description
Type
Default
field
The field name. If the key is part of the record Key, prefix with _key; otherwise _value. If _value or _key is not used, it defaults to the record Value to resolve the field.
String
format.from.pattern
Optional DateTimeFormatter-compatible format for the timestamp. Used to parse the input if the input is a string. Multiple (fallback) patterns can be added, comma-separated.
String
unix.precision
Optional. The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. Used to parse the input if the input is a Long.
String
Example
To use the record Value field named created_at as the unix timestamp, use the following:
To use the record Key field named created_at as the unix timestamp, use the following:
To prefix the headers with wallclock_, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
Configuration for format.from.pattern
Configuring multiple format.from.pattern items requires careful thought as to ordering and may indicate that your Kafka topics or data processing techniques are not aligning with best practices. Ideally, each topic should have a single, consistent message format to ensure data integrity and simplify processing.
Multiple Patterns Support
The format.from.pattern field supports multiple DateTimeFormatter patterns in a comma-separated list to handle various timestamp formats. Patterns containing commas should be enclosed in double quotes. For example:
Best Practices
While this flexibility can be useful, it is generally not recommended due to potential complexity and inconsistency. Ideally, a topic should have a single message format to align with Kafka best practices, ensuring consistency and simplifying data processing.
Configuration Order
The order of patterns in format.from.pattern matters. Less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd after yyyy-MM-dd'T'HH:mm:ss to ensure detailed timestamp information is preserved.
allows conversion from one string representation to another (e.g. yyyy-MM-dd HH:mm:ss to yyyy-MM-dd)
allows conversion using a rolling window boundary (e.g. every 15 minutes, or one hour)
Transform Type Class
Configuration
Name
Description
Type
Default
Valid Values
header.name
The name of the header to insert the timestamp into.
String
field
The field path containing the timestamp, or empty if the entire value is a timestamp. Prefix the path with the literal string _key, _value or _timestamp, to specify the record Key, Value or Timestamp is used as source. If not specified _value is implied.
String
Example
To convert to and from a string representation of the date and time in the format yyyy-MM-dd HH:mm:ss.SSS, use the following configuration:
To convert to and from a string representation while applying an hourly rolling window:
To convert to and from a string representation while applying an hourly rolling window and timezone:
To convert to and from a string representation while applying a 15 minutes rolling window:
To convert to and from a Unix timestamp, use the following:
Here is an example using the record timestamp field:
Configuration for format.from.pattern
Configuring multiple format.from.pattern items requires careful thought as to ordering and may indicate that your Kafka topics or data processing techniques are not aligning with best practices. Ideally, each topic should have a single, consistent message format to ensure data integrity and simplify processing.
Multiple Patterns Support
The format.from.pattern field supports multiple DateTimeFormatter patterns in a comma-separated list to handle various timestamp formats. Patterns containing commas should be enclosed in double quotes. For example:
Best Practices
While this flexibility can be useful, it is generally not recommended due to potential complexity and inconsistency. Ideally, a topic should have a single message format to align with Kafka best practices, ensuring consistency and simplifying data processing.
Configuration Order
The order of patterns in format.from.pattern matters. Less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd after yyyy-MM-dd'T'HH:mm:ss to ensure detailed timestamp information is preserved.
Each Kafka record represents a file and has the following types.
The format of the keys is configurable through connect.ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with the name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.
The values of the records contain the body of the file as bytes.
Tailing Versus Update as a Whole
The following rules are used.
Tailed files are only allowed to grow. Bytes that have been appended to it since the last inspection are yielded. Preceding bytes are not allowed to change;
Updated files can grow, shrink and change anywhere. The entire contents are yielded.
Data converters
Instead of dumping whole file bodies (and the danger of exceeding Kafka’s message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:
The default object that is used is a pass-through converter, an instance of:
To override it, create your own implementation of SourceRecordConverter and place the jar in the plugin.path.
To learn more examples of using the FTP Kafka connector read this blog.
Option Reference
Name
Description
Type
Default Value
connect.ftp.address
host[:port] of the ftp server
string
connect.ftp.user
Username to connect with
string
connect.ftp.password
Password to connect with
string
GCP PubSub
This page describes the usage of the Stream Reactor Google PubSub Source Connector.
The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).
Connector Class
Example
For more examples see the .
KCQL Support
You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The connector does not support multiple KCQL statements that reference the same source location; to use multiple statements, configure each one in a separate connector instance.
Source Subscription ID and Target Topic
The source and target of the data are specified via the INSERT INTO... SELECT * FROM clause. The connector will write all the records to the given topic, from the given subscription:
Properties
The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').
Properties can be defined in any order.
The following properties are supported:
Name
Description
Type
Default Value
Auth Mode
The connector offers three distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.
When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
Here's an example configuration for the "Credentials" mode:
And here is an example configuration using the "File" mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.
Output Modes
Two modes are available: Default Mode and Compatibility Mode.
Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.
You can choose whichever suits your requirements.
Default Mode
Configuration
Record Schema
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: A String of the Pub/Sub MessageID.
Kafka Value: The Pub/Sub message value as BYTES.
Kafka Headers: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.
Key Schema
The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.
Value Schema
The Kafka Value is mapped from the body of the Pub/Sub message.
Headers Schema
The Kafka Headers include:
PublishTimestamp: Long value representing the time when the Pub/Sub message was published, in seconds.
GCPProjectID: The GCP Project
PubSubTopicID: The Pub/Sub Topic ID.
Compatibility Mode
Configuration
Record Schema
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.
Kafka Value: Contains the message data and attributes from the Pub/Sub message.
Key Schema
The Key is a structure with these fields:
Field Name
Schema Type
Description
Value Schema
The Value is a structure with these fields:
Field Name
Schema Type
Description
Option Reference
Name
Description
Type
Available Values
Default Value
InsertRollingFieldTimestampHeaders
Inserts the datetime as a message header from a value field and a rolling window configuration.
A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using a timestamp field from the record payload and a rolling time window configuration. The timestamp field can be in various valid formats, including long integers, strings, or date objects. The timestamp field can originate from either the record Key or the record Value. When extracting from the record Key, prefix the field with _key.; otherwise, extract from the record Value by default or explicitly using the field without prefixing. For string-formatted fields, specify a format.from.pattern parameter to define the parsing pattern. Long integer fields are assumed to be Unix timestamps; the desired Unix precision can be specified using the unix.precision parameter.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH or yyyy/MM/dd/HH, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _, here are a few KCQL examples:
Transform Type Class
Configuration
Name
Description
Type
Default
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
Configuration for format.from.pattern
Configuring multiple format.from.pattern items requires careful thought as to ordering and may indicate that your Kafka topics or data processing techniques are not aligning with best practices. Ideally, each topic should have a single, consistent message format to ensure data integrity and simplify processing.
Multiple Patterns Support
The format.from.pattern field supports multiple DateTimeFormatter patterns in a comma-separated list to handle various timestamp formats. Patterns containing commas should be enclosed in double quotes. For example:
Best Practices
While this flexibility can be useful, it is generally not recommended due to potential complexity and inconsistency. Ideally, a topic should have a single message format to align with Kafka best practices, ensuring consistency and simplifying data processing.
Configuration Order
The order of patterns in format.from.pattern matters. Less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd after yyyy-MM-dd'T'HH:mm:ss to ensure detailed timestamp information is preserved.
This page describes the usage of the Stream Reactor Redis Sink Connector.
This connector has been retired starting version 11.0.0
{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 821122,
"partition": 3,
"timestamp": 1695645345,
"topic": "source_topic"
}
}
INSERT INTO lensesioaws
SELECT * FROM payments
STOREAS AVRO
PROPERTIES (
‘store.envelope’=true
);
connect.s3.kcql=INSERT INTO lensesioaws SELECT * FROM payments STOREAS AVRO PROPERTIES ( ‘store.envelope’=true)
name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.client.id=dm_source_id
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.service.quality=1
INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-mqtt-topic>
[WITHCONVERTER=`myclass`]
-- Insert mode, select all fields from topicA
-- and write to topic topic with converter myclass
INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]
-- wildcard
INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]
name=jms
connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON
INSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE
-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE
-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hour
name=ftp-source
connector.class=io.lenses.streamreactor.connect.ftp.source.FtpSourceConnector
tasks.max=1
#server settings
connect.ftp.address=localhost:21
connect.ftp.user=ftp
connect.ftp.password=ftp
#refresh rate, every minute
connect.ftp.refresh=PT1M
#ignore files older than 14 days.
connect.ftp.file.maxage=P14D
#monitor /forecasts/weather/ and /logs/ for appends to files.
#any updates go to the topics `weather` and `error-logs` respectively.
connect.ftp.monitor.tail=/forecasts/weather/:weather,/logs/:error-logs
#keep an eye on /statuses/, files are retrieved as a whole and sent to topic `status`
connect.ftp.monitor.update=/statuses/:status
#keystyle controls the format of the key and can be string or struct.
#string only provides the file name
#struct provides a structure with the filename and offset
connect.ftp.keystyle=struct
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.log.message
Logs received MQTT messages
boolean
false
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.mqtt.polling.timeout
Provides the timeout to poll incoming messages
int
1000
connect.mqtt.share.replicate
Replicate the shared subscriptions to all tasks instead of distributing them
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
connect.mqtt.process.duplicates
Process duplicate messages
boolean
false
ConnectionFactory
connect.jms.kcql
connect.jms.kcql
string
connect.jms.subscription.name
subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics
string
connect.jms.password
Provides the password for the JMS connection
password
connect.jms.username
Provides the user for the JMS connection
string
connect.jms.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.jms.retry.interval
The time in milliseconds between retries.
int
60000
connect.jms.max.retries
The maximum number of times to try the write again.
int
20
connect.jms.destination.selector
Selector to use for destination lookup. Either CDI or JNDI.
string
CDI
connect.jms.initial.context.extra.params
List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp
list
[]
connect.jms.batch.size
The number of records to poll for on the target JMS destination in each Connect poll.
int
100
connect.jms.polling.timeout
Provides the timeout to poll incoming messages
long
1000
connect.jms.source.default.converter
Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter
string
connect.jms.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.headers
Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.jms.evict.interval.minutes
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.
int
10
connect.jms.evict.threshold.minutes
The number of minutes after which an uncommitted entry becomes evictable from the connector cache.
int
10
connect.jms.scale.type
How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max
milliseconds
header.prefix.name
Optional header prefix.
String
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
year.format
Optional Java date time formatter for the year component.
String
yyyy
month.format
Optional Java date time formatter for the month component.
String
MM
day.format
Optional Java date time formatter for the day component.
String
dd
hour.format
Optional Java date time formatter for the hour component.
String
HH
minute.format
Optional Java date time formatter for the minute component.
String
mm
second.format
Optional Java date time formatter for the second component.
String
ss
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
target.type
Sets the desired timestamp representation.
String
string,unix,date,time,timestamp
format.from.pattern
Sets the format of the timestamp when the input is a string. The format requires a Java DateTimeFormatter-compatible pattern. Multiple (fallback) patterns can be added, comma-separated.
String
format.to.pattern
Sets the format of the timestamp when the output is a string. The format requires a Java DateTimeFormatter-compatible pattern.
String
rolling.window.type
An optional parameter for the rolling time window type. When set it will adjust the output value according to the time window boundary.
String
none
none, hours, minutes, seconds
rolling.window.size
An optional positive integer parameter for the rolling time window size. When rolling.window.type is defined this setting is required. The value is bound by the rolling.window.type configuration. If type is minutes or seconds then the value cannot bigger than 60, and if the type is hours then the max value is 24.
Int
15
unix.precision
The desired Unix precision for the timestamp. Used to generate the output when type=unix or used to parse the input if the input is a Long. This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components.
String
milliseconds
seconds, milliseconds, microseconds, nanoseconds
timezone
Sets the timezone. It can be any valid java timezone. Overwrite it when target.type is set to date, time, or string, otherwise it will raise an exception.
String
UTC
connect.ftp.refresh
iso8601 duration that the server is polled
string
connect.ftp.file.maxage
iso8601 duration for how old files can be
string
connect.ftp.keystyle
SourceRecord keystyle, string or struct
string
connect.ftp.protocol
Protocol to use, FTP or FTPS
string
ftp
connect.ftp.timeout
Ftp connection timeout in milliseconds
int
30000
connect.ftp.filter
Regular expression to use when selecting files for processing
string
.*
connect.ftp.monitor.tail
Comma separated lists of path:destinationtopic; tail of file to tracked
string
connect.ftp.monitor.update
Comma separated lists of path:destinationtopic; whole file is tracked
Optional Java date time formatter for the year component.
String
yyyy
month.format
Optional Java date time formatter for the month component.
String
MM
day.format
Optional Java date time formatter for the day component.
String
dd
hour.format
Optional Java date time formatter for the hour component.
String
HH
minute.format
Optional Java date time formatter for the minute component.
String
mm
second.format
Optional Java date time formatter for the second component.
String
ss
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
## Example
field
The field name. If the key is part of the record Key, prefix with _key; otherwise _value. If _value or _key is not used, it defaults to the record Value to resolve the field.
String
format.from.pattern
Optional DateTimeFormatter-compatible format for the timestamp. Used to parse the input if the input is a string. Multiple (fallback) patterns can be added, comma-separated.
String
unix.precision
Optional. The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. Used to parse the input if the input is a Long.
String
milliseconds
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hour
PubSubSubscriptionID: The Pub/Sub Subscription ID.
All Pub/Sub message attributes: Each attribute from the Pub/Sub message is mapped as a separate header.
For “auth.mode” file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.pubsub.gcp.project.id
GCP Project ID.
string
(Empty)
connect.pubsub.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
connect.pubsub.output.mode
Output mode. Please see documentation below.
Default or Compatibility
Default
batch.size
The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping).
int
1000
cache.ttl
The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message.
long
1 hour
queue.max
Data is loaded into a queue asynchronously so that it stands ready when the poll call is activated. Control the maximum number of records to hold in the queue per KCQL mapping.
int
ProjectId
String
The Pub/Sub project containing the topic from which messages are polled.
TopicId
String
The Pub/Sub topic containing the messages.
SubscriptionId
String
The Pub/Sub subscription of the Pub/Sub topic.
MessageId
String
MessageData
Optional String
The body of the Pub/Sub message.
AttributeMap
Optional String
The attribute map associated with the Pub/Sub message.
connect.pubsub.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
Credentials, File or Default
Default
connect.pubsub.gcp.credentials
For “auth.mode” credentials: GCP Authentication credentials string.
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Cache mode
The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:
You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:
This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:
Composite keys are supported with the PK clause, a delimiter can be set with the optional configuration property connect.redis.pk.delimiter.
Sorted Sets
To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:
This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.
The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.
Multiple Sorted Sets
The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store in the sorted-sets. Set KCQL clause to define the filed using PK (primary key)
Notice we have dropped the INSERT clause.
The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:
This will create a key with names FX-USDGBP , FX-EURGBP etc.
The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.
Geospatial add
To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:
Streams
To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:
PubSub
To insert a message from a Kafka topic to a Redis PubSub use the following KCQL syntax:
The channel to write to in Redis is determined by a field in the payload of the Kafka message set in the KCQL statement, in this case, a field called myfield.
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
string
ssl.protocol
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
string
Azure Service Bus
This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.
This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.
Connector Class
Full Config Example
For more examples see the .
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.
KCQL support
You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.
The following KCQL is supported:
It allows you to map Service Bus of name <your-service-bus> to Kafka topic of name <your-kafka-topic> using the PROPERTIES specified.
The selection of fields from the Service Bus message is not supported.
Payload support
Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.
Key Format (Schema)
Field Name
Schema Type
Description
Payload Format (Schema)
Authentication
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the .
Queues & Topics
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check to learn more about those mechanisms.
Reading from Queues
In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
Reading from Topics
In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC here and we need to pass it.
Parameter subscription.name which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.
Make sure your subscription exists otherwise you will get a similar error to this
Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity 'streamreactor:Topic:my-topic|lenses' could not be found. To know more visit .
Create the subscription per topic in the Azure Portal.
This is sufficient to enable you to create the mapping with your topic.
Option Reference
KCQL Properties
Please find below all the necessary KCQL properties:
Name
Description
Type
Default Value
Configuration parameters
Please find below all the relevant configuration parameters:
Name
Description
Type
Default Value
MongoDB
This page describes the usage of the Stream Reactor MongoDB Sink Connector.
Connector Class
Example
For more examples see the .
KCQL support
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert Mode
Insert is the default write mode of the sink.
Upsert Mode
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Batching
The BATCH clause controls the batching of writes to MongoDB.
TLS/SSL
TLS/SSL is supported by setting ?ssl=true in the connect.mongo.connection option. The MongoDB driver will then attempt to load the truststore and keystore using the JVM system properties.
You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:
javax.net.ssl.trustStore: the path to a trust store containing the certificate of the signing authority
javax.net.ssl.trustStorePassword: the password to access this trust store
javax.net.ssl.keyStore: the path to a key store containing the client’s SSL certificates
Authentication Mechanism
All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism in the connect.mongo.connection option.
The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism option.
If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism has precedence.
e.g.
JSON Field dates
List of fields that should be converted to ISO Date on MongoDB insertion (comma-separated field names), for JSON topics only. Field values may be an epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If the string does not parse to ISO, it will be written as a string instead.
Subdocument fields can be referred to in the following examples:
topLevelFieldName
topLevelSubDocument.FieldName
topLevelParent.subDocument.subDocument2.FieldName
If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
This is controlled via the connect.mongo.json_datetime_fields option.
Kafka payload support
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
Error policies
The connector supports .
Option Reference
Name
Description
Type
Default Value
JMS
This page describes the usage of the Stream Reactor JMS Source Connector.
A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.
The connector uses the standard JMS protocols and has been tested against ActiveMQ.
The connector allows for the JMS initial.context.factory and connection.factory to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path of the connector.
Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.
The schema of the messages is fixed and can be found under Data Types unless a converter is used.
You must provide the JMS implementation jars for your JMS service.
Connector Class
Example
For more examples see the .
KCQL support
You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.
However, you can not route the same source to different topics, for this use a separate connector instance.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
Destination types
The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.
Message Converters
The connector supports converters to handle different message payload formats in the source topic or queue.
If no converter is provided the JMS message is converter to a Kafka Struct representation.
See .
Data type conversion
Name
Type
Option Reference
Name
Description
Type
Default Value
MQTT
This page describes the usage of the Stream Reactor MQTT Sink Connector.
Connector Class
Example
Deprecated Cassandra
This page describes the usage of the Stream Reactor Cassandra Sink Connector part of the kafka-connect-cassandra-*** artifact.
The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.
name=GcpPubSubSourceDemo
connector.class=io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector
topics=kafka_topic_to_write_to
tasks.max=1
connect.pubsub.gcp.auth.mode=File
connect.pubsub.gcp.file=/path/to/gcp-service-account-key.json
connect.pubsub.gcp.project.id=gcp-project-id
connect.pubsub.kcql=insert into `kafka_topic_to_write_to` select * from `gcp-subscription-id`
INSERT INTO kafka-topic
SELECT *
FROM subscriptionId
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]
INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix
INSERT INTO my-topic SELECT * FROM subscriptionId;
The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
password
ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
string
ssl.truststore.password
The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
password
ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
string
SunX509
ssl.trustmanager.algorithm
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
string
PKIX
ssl.keystore.type
The file format of the key store file. This is optional for client.
string
JKS
ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
list
ssl.endpoint.identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate.
string
https
ssl.truststore.type
The file format of the trust store file.
string
JKS
ssl.enabled.protocols
The list of protocols enabled for SSL connections.
list
[TLSv1.2, TLSv1.1, TLSv1]
ssl.key.password
The password of the private key in the key store file. This is optional for client.
password
ssl.secure.random.implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations.
string
connect.redis.kcql
KCQL expression describing field selection and routes.
string
connect.redis.host
Specifies the redis server
string
connect.redis.port
Specifies the redis connection port
int
connect.redis.password
Provides the password for the redis connection.
password
connect.redis.ssl.enabled
Enables ssl for the redis connection
boolean
false
connect.redis.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.redis.retry.interval
The time in milliseconds between retries.
int
60000
connect.redis.max.retries
The maximum number of times to try the write again.
int
20
connect.progress.enabled
Enables the output for how many records have been processed
javax.net.ssl.keyStorePassword: the password to access this key store
ssl.key.password
The password of the private key in the key store file. This is optional for client.
password
ssl.keystore.type
The file format of the key store file. This is optional for client.
string
JKS
ssl.truststore.location
The location of the trust store file.
string
ssl.endpoint.identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate.
string
https
ssl.protocol
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
string
TLS
ssl.trustmanager.algorithm
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
string
PKIX
ssl.secure.random.implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations.
string
ssl.truststore.type
The file format of the trust store file.
string
JKS
ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
string
SunX509
ssl.provider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
string
ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
string
ssl.truststore.password
The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
password
connect.mongo.connection
The mongodb connection in the format mongodb://[username:password@]host1[:port1],host2[:port2],…[,hostN[:portN]]][/[database][?options]].
string
connect.mongo.db
The mongodb target database.
string
connect.mongo.username
The username to use when authenticating
string
connect.mongo.password
The password for the use when authenticating
password
connect.mongo.auth.mechanism
String
string
SCRAM-SHA-1
connect.mongo.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mongo.max.retries
The maximum number of times to try the write again.
int
20
connect.mongo.retry.interval
The time in milliseconds between retries.
int
60000
connect.mongo.kcql
KCQL expression describing field selection and data routing to the target mongo db.
string
connect.mongo.json_datetime_fields
List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead. Subdocument fields can be referred to as in the following examples: “topLevelFieldName”, “topLevelSubDocument.FieldName”, “topLevelParent.subDocument.subDocument2.FieldName”, (etc.) If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
list
[]
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
list
ssl.enabled.protocols
The list of protocols enabled for SSL connections.
list
[TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.password
The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics
string
connect.jms.password
Provides the password for the JMS connection
password
connect.jms.username
Provides the user for the JMS connection
string
connect.jms.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.jms.retry.interval
The time in milliseconds between retries.
int
60000
connect.jms.max.retries
The maximum number of times to try the write again.
int
20
connect.jms.destination.selector
Selector to use for destination lookup. Either CDI or JNDI.
string
CDI
connect.jms.initial.context.extra.params
List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp
list
[]
connect.jms.batch.size
The number of records to poll for on the target JMS destination in each Connect poll.
int
100
connect.jms.polling.timeout
Provides the timeout to poll incoming messages
long
1000
connect.jms.source.default.converter
Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter
string
connect.jms.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.headers
Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.jms.evict.interval.minutes
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.
int
10
connect.jms.evict.threshold.minutes
The number of minutes after which an uncommitted entry becomes evictable from the connector cache.
int
10
connect.jms.scale.type
How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max
connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-queue PROPERTIES('servicebus.type'='QUEUE');
connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-topic PROPERTIES('servicebus.type'='TOPIC','subscription.name'='subscription1');
name=mongo
connector.class=io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector
tasks.max=1
topics=orders
connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders
connect.mongo.db=connect
connect.mongo.connection=mongodb://mongo:27017
INSERT | UPSERT
INTO <collection_name>
SELECT FIELD, ...
FROM <kafka-topic>
BATCH = 100
-- Select all fields from topic fx_prices and insert into the fx collection
INSERT INTO fx SELECT * FROM fx_prices
-- Select all fields from topic fx_prices and upsert into the fx collection,
-- The assumption is there will be a ticker field in the incoming json:
UPSERT INTO fx SELECT * FROM fx_prices PK ticker
name=jms-source
connector.class=io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://activemq:61616
connect.jms.connection.factory=ConnectionFactory
INSERT INTO kafka_topic
SELECT *
FROM jms_destination
WITHTYPE [TOPIC|QUEUE]
[WITHCONVERTER=`myclass`]
-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE
-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA
SELECT * FROM jms_topic
WITHTYPE TOPIC
WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.AvroConverter`
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Dynamic targets
The connector can route the messages to specific MQTT targets. These are the possible options:
Given Constant Topic:
Route messages to a specified MQTT topic using a constant value:
Using a Field Value for Topic:
Direct messages to an MQTT topic based on a specified field's value. Example expression: fieldA.fieldB.fieldC.
Utilizing Message Key as Topic:
Determine the MQTT topic by the incoming message key, expected to be a string.
Leveraging Kafka Message Topic:
Set the MQTT topic using the incoming Kafka message’s original topic.
Kafka payload support
The sink publishes each Kafka record to MQTT according to the type of the record’s value:
Payload type
Action taken by the sink
Binary (byte[])
Forwarded unchanged (“pass-through”).
String
Published as its UTF-8 byte sequence.
Connect Struct produced by Avro, Protobuf, or JSON Schema converters
Converted to JSON, then published as the JSON string’s UTF-8 bytes.
java.util.Map or other Java Collection
Serialized to JSON and published as the JSON string’s UTF-8 bytes.
In short, non-binary objects are first turned into a JSON string; everything that reaches MQTT is ultimately a sequence of bytes.
Version 9 introduces two breaking changes that affect how you route data and project fields in KCQL. Review the sections below and update your configurations before restarting the connector.
WITHTARGET is no longer used
Before (v < 9.0.0)
After (v ≥ 9.0.0)
INSERT INTO SELECT * FROM control.boxes.test
WITHTARGET ${path}
INSERT INTO ${path}
SELECT * FROM control.boxes.test
PROPERTIES('mqtt.target.from.field'='true')
Migration step
Delete every WITHTARGET … clause.
Move the placeholder (or literal) that held the target path into the INSERT INTO expression.
Add mqtt.target.from.field=true to the KCQL PROPERTIES list.
KCQL field projections are ignored
Before (v < 9.0.0)
After (v ≥ 9.0.0)
Handled directly in the KCQL SELECT list:
SELECT id, temp AS temperature …
The connector passes the full record. Any projection, renaming, or value transformation must be done with a Kafka Connect Single Message Transformer (SMT), KStreams, or another preprocessing step.
Migration step
Remove field lists and aliases from KCQL.
Attach an SMT such as org.apache.kafka.connect.transforms.ExtractField$Value, org.apache.kafka.connect.transforms.MaskField$Value, or your own custom SMT to perform the same logic.
As of Stream-Reactor version 10, the sink will be deprecated. No other changes or fixes will be provided. Follow the Cassandra link to see the new sink implementation details.
You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Deletion in Cassandra
Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.
Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:
If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:
Deletion will only occur if a message with an empty payload is received from Kafka.
Ensure your ordinal position of the connect.cassandra.delete.struct_flds matches the binding order in the Cassandra delete statement!
Initial contact point host for Cassandra including port.
string
localhost
connect.cassandra.port
Cassandra native port.
int
9042
connect.cassandra.key.space
Keyspace to write to.
string
Hashicorp Vault
This page describes how to retrieve secrets from Hashicorp Vault for use in Kafka Connect.
Secure secrets in Hashicorp Vault and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
From Version 2.2.0, the secret provider does not write secrets to files by default. If you require this behaviour (for trust stores, key stores or certs) you can enable this by adding the property file.write=true.
Authentication
Multiple authentication methods are supported:
approle
userpass
kubernetes
Configuring the plugin
Name
Description
Example Worker Properties
Usage
To use this provider in a connector, reference the Hashicorp Vault containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path is the path of the secret in Hashicorp Vault
key is the name of the secret key in secret to retrieve. Vault can store multiple keys under a path.
For example, if we store two secrets as keys:
my_username_key with the value lenses and
my_password_key with the value my-secret-password
in a secret called secret/my-vault-secret we would set:
This would resolve at runtime to:
Data encoding
The provider handles the following types:
utf_8
base64
The provider will look for keys prefixed with:
UTF8
UTF_FILE
BASE64
The UTF8 means the value returned is the string retrieved for the secret key. The BASE64 means the value returned is the base64 decoded string retrieved for the secret key.
If the value for the tag is UTF8_FILE the string contents are written to a file. The returned value from the connector configuration key will be the location of the file. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties file.
If the value for the tag is BASE64_FILE the string contents are based64 decoded and are written to a file. The returned value from the connector configuration key will be the location of the file. For example, if a connector needs a PEM file on disk set the prefix as BASE64_FILE. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties file.
If no prefix is found the contents of the secret string are returned.
Elasticsearch
This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.
Connector Class
Elasticsearch 6
INSERT INTO lenses-io-demo ...
INSERT INTO `<path to field>`
SELECT * FROM control.boxes.test
PROPERTIES('mqtt.target.from.field'='true')
name=mqtt
connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=orders
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_sink_id
connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders
INSERT
INTO <mqtt-topic>
SELECT * //no field projection supported
FROM <kafka-topic>
//no WHERE clause supported
-- Insert into /landoop/demo all fields from kafka_topicA
INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA
-- Insert into /landoop/demo all fields from dynamic field
INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')
name=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=cassandra
INSERT INTO <your-cassandra-table>
SELECT FIELD,...
FROM <your-table>
[TTL=Time to live]
-- Insert mode, select all fields from topicA and
-- write to tableA
INSERT INTO tableA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB
INSERT INTO tableB SELECT x AS a, y, c FROM topicB
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB with TTL
INSERT INTO tableB SELECT x, y FROM topicB TTL=100000
DELETE FROM orders WHERE id = ? and product = ?
# Message
# "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
# DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"
# connect.cassandra.delete.enabled=true
# connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
# connect.cassandra.delete.struct_flds=id,product
connect.cassandra.username
Username to connect to Cassandra with.
string
connect.cassandra.password
Password for the username to connect to Cassandra with.
password
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
boolean
false
connect.cassandra.trust.store.path
Path to the client Trust Store.
string
connect.cassandra.trust.store.password
Password for the client Trust Store.
password
connect.cassandra.trust.store.type
Type of the Trust Store, defaults to JKS
string
JKS
connect.cassandra.key.store.type
Type of the Key Store, defauts to JKS
string
JKS
connect.cassandra.ssl.client.cert.auth
Enable client certification authentication by Cassandra. Requires KeyStore options to be set.
boolean
false
connect.cassandra.key.store.path
Path to the client Key Store.
string
connect.cassandra.key.store.password
Password for the client Key Store
password
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.
string
connect.cassandra.fetch.size
The number of records the Cassandra driver will return at once.
int
5000
connect.cassandra.load.balancing.policy
Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN
string
TOKEN_AWARE
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
string
THROW
connect.cassandra.max.retries
The maximum number of times to try the write again.
int
20
connect.cassandra.retry.interval
The time in milliseconds between retries.
int
60000
connect.cassandra.threadpool.size
The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.
int
0
connect.cassandra.delete.struct_flds
Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.
list
[]
connect.cassandra.delete.statement
Delete statement for cassandra
string
connect.cassandra.kcql
KCQL expression describing field selection and routes.
string
connect.cassandra.default.value
By default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved.
string
connect.cassandra.delete.enabled
Enables row deletion from cassandra
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.service.quality
Specifies the Mqtt quality of service
int
connect.mqtt.timeout
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
cert
token
ldap
gcp
awsiam
jwt
github
Set a global namespace to the Vault server instance. Requires Vault Enterprise Pro
vault.pem
File containing the Vault Server certificate content as string
vault.client.pem
File containing the Client certificate string content as string
vault.engine.version
KV Secrets Engine version of the Vault server instance. Default is 2
vault.ssl.truststore.location
The location of the trust store file
vault.ssl.keystore.location
The location of the key store file
vault.ssl.keystore.password
The password for the key store file
secret.default.ttl
If no TTL is configured in Vault, apply a default TTL.
app.role.id
Use when vault.auth.method is approle or kubernetes to specify the Vault App role id
app.role.secret.id
Use when vault.auth.method is approle tp specify the Vault App role name secret id
app.role.path
Use when vault.auth.method is approle to specify the Vault App role path
username
Use when vault.auth.method is userpass to specify the username to connect to Vault
password
Use when vault.auth.method is userpass to specify the password to connect to Vault
mount
Use when vault.auth.method is userpass to specify the mount name of the userpass authentication back end
ldap.username
Use when vault.auth.method is ldap to specify the LDAP username to connect to Vault with
ldap.password
Use when vault.auth.method is ldap to specify the LDAP password to connect to Vault with
mount
Use when vault.auth.method is ldap to specify the mount name of the ldap authentication back end
jwt.role
Use when vault.auth.method is jwt to specify the role the JWT token belongs to
jwt.provider
Use when vault.auth.method is jwt to specify the provider of the JWT token
jwt
Use when vault.auth.method is jwt to specify the JWT token
gcp.role
Use when vault.auth.method is gcp to specify the gcp role used for authentication
gcp.jwt
Use when vault.auth.method is gcp to specify the JWT token
cert.mount
Use when vault.auth.method is cert to specify the mount name of the cert authentication back end
github.token
Use when vault.auth.method is github to specify the github app-id to use for authentication
github.mount
Use when vault.auth.method is github to specify the mount name of the github authentication back end
kubernetes.role
Use when vault.auth.method is kubernetes to specify the kubernetes role for authentication
kubernetes.token.path
Use when vault.auth.method is kubernetes to specify the path to the service account token . Default is /var/run/secrets/kubernetes.io/serviceaccount/token
kubernetes.auth.path
Use when vault.auth.method is kubernetes to specify a custom mount path
aws.role
Use when vault.auth.method is awsiam.
Name of the role to login. If role is not specified, the login endpoint uses the role bearing the name of the AMI ID of the EC2 instance or if using the ec2 auth method the friendly name (i.e., role name or username) of the IAM authenticated principal
aws.request.url
Use when vault.auth.method is awsiam.
PKCS7 signature of the identity document with all n characters removed. Base64-encoded HTTP URL used in the signed request (i.e. base64-encoding of https://sts.amazonaws.com) as most requests will probably use POST with an empty URI
aws.request.body
Use when vault.auth.method is awsiam.
Base64-encoded body of the signed request i.e. base64 of Action=GetCallerIdentity&Version=2011-06-15
aws.request.headers
Use when vault.auth.method is awsiam to specify any request headers
aws.mount
Use when vault.auth.method is awsiam. The AWS auth mount. Default is “aws”
BASE64_FILE
file.dir
The base location for any files to be stored
file.write
Writes secrets to file on path. Required for Java trust stores, key stores, certs that need to be loaded from file. For ease of use for the secret provider, this is disabled by default.
vault.auth.method
Available values are approle, userpass, kubernetes, cert, token, ldap, gcp, awsiam, jwt, github, token
vault.addr
Address of the Vault server
vault.token
Use when ‘vault.auth.method’ is ‘token’ to specify the token value
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Kafka Tombstone Handling
It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:
IGNORE (ignores tombstones entirely)
FAIL (throws Exception if tombstone happens)
DELETE (deletes index with specified id)
Example:
Primary Keys
The PK keyword allows you to specify fields that will be used to generate the key value in Elasticsearch. The values of the selected fields are concatenated and separated by a hyphen (-).
If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.
Field Prefixes
When defining fields, specific prefixes can be used to determine where the data should be extracted from:
_key Prefix
Specifies that the value should be extracted from the message key.
If a path is provided after _key, it identifies the location within the key where the field value resides.
If no path is provided, the entire message key is used as the value.
_value Prefix
Specifies that the value should be extracted from the message value.
The remainder of the path identifies the specific location within the message value to extract the field.
_header Prefix
Specifies that the value should be extracted from the message header.
The remainder of the path indicates the name of the header to be used for the field value.
Insert and Upsert modes
INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.
Document Type
WITHDOCTYPE allows you to associate a document type to the document inserted.
Index Suffix
WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.
Example:
Index Names
Static Index Names
To use a static index name, define the target index in the KCQL statement without any prefixes:
This will consistently create an index named index_name for any messages consumed from topicA.
Extracting Index Names from Headers, Keys, and Values
Headers
To extract an index name from a message header, use the _header prefix followed by the header name:
This statement extracts the value from the gate header field and uses it as the index name.
For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):
In this case, the value of the header named prefix.abc.suffix is used to form the index name.
Keys
To use the full value of the message key as the index name, use the _key prefix:
For example, if the message key is "freddie", the resulting index name will be freddie.
Values
To extract an index name from a field within the message value, use the _value prefix followed by the field name:
This example uses the value of the name field from the message's value. If the field contains "jason", the index name will be jason.
Nested Fields in Values
To access nested fields within a value, specify the full path using dot notation:
If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.
Fields with Dots in Their Names
For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):
If the value structure contains:
The extracted index name will be hans.
Auto Index Creation
The Sink will automatically create missing indexes at startup.
Please note that this feature is not compatible with index names extracted from message headers/keys/values.
Options Reference
Name
Description
Type
Default Value
connect.elastic.protocol
URL protocol (http, https)
string
http
connect.elastic.hosts
List of hostnames for Elastic Search cluster node, not including protocol or port.
string
localhost
connect.elastic.port
Port on which Elastic Search node listens on
string
KCQL Properties
Name
Description
Type
Default Value
behavior.on.null.values
Specifies behavior on Kafka tombstones: IGNORE , DELETE or FAIL
String
IGNORE
SSL Configuration Properties
Property Name
Description
ssl.truststore.location
Path to the truststore file containing the trusted CA certificates for verifying broker certificates.
ssl.truststore.password
Password for the truststore file to protect its integrity.
ssl.truststore.type
Type of the truststore (e.g., JKS, PKCS12). Default is JKS.
ssl.keystore.location
Path to the keystore file containing the client’s private key and certificate chain for client authentication.
ssl.keystore.password
Password for the keystore to protect the private key.
ssl.keystore.type
Type of the keystore (e.g., JKS, PKCS12). Default is JKS.
SSL Configuration
Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.
While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:
Data encryption: Prevents unauthorized access to data being transferred.
Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.
Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).
Configuration Example
Terminology:
Truststore: Holds certificates to check if the node’s certificate is valid.
Keystore: Contains your client’s private key and certificate to prove your identity to the node.
SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.
Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.
Azure CosmosDB
This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.
Version 10.0.0 introduces breaking changes: the connector has been renamed from DocumentDB, uses the official CosmosDB SDK, and supports new bulk and key strategies.
A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.
Connector Class
Example
For more examples see the .
KCQL support
You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert Mode
Insert is the default write mode of the sink. It inserts messages from Kafka topics into CosmosDB.
Upsert Mode
The Sink supports CosmosDB upsert functionality which replaces the existing row if a match is found on the primary keys.
This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.
If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.
Bulk Mode
Bulk mode enables efficient batching of writes to CosmosDB, reducing API calls and improving throughput. Enable it with:
When enabled, you can control batching behavior using the KCQL PROPERTIES clause (flush.size, flush.count, flush.interval). If disabled, records are written individually.
Custom Key Strategy
You can control how the connector populates the id field in CosmosDB documents using:
connect.cosmosdb.retry.interval (milliseconds between retries)
These settings control how the connector responds to errors during writes.
Throughput
You can set the manual throughput (RU/s) for new CosmosDB collections with:
The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.
Proxy
If you need to connect via a proxy, specify the proxy details with:
Progress Reporting
Enable progress reporting to log how many records have been processed:
Option Reference
Name
Description
Type
Default Value
KCQL Properties
Note: The following KCQL PROPERTIES options are only available when connect.cosmosdb.bulk.enabled=true. If bulk mode is disabled, records are written individually.
The CosmosDB Sink Connector supports the following KCQL PROPERTIES options to control file flushing behavior:
Property
Description
Type
Default Value
Flush Options Explained:
Flush by Count: Triggers a flush after a specified number of records have been written.
Flush by Size: Initiates a flush once a predetermined size (in bytes) has been reached.
Flush by Interval: Enforces a flush after a defined time interval (in seconds), acting as a fail-safe to ensure timely data management even if other flush conditions are not met.
You can use these properties in your KCQL statement's PROPERTIES clause, for example:
Kafka payload support
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
Error policies
The connector supports .
Endpoint and Master Key Configuration
To connect to your Azure CosmosDB instance, the connector requires two essential configuration properties:
connect.cosmosdb.endpoint: This specifies the URI of your CosmosDB account. It should point to the connection endpoint provided in the Azure CosmosDB dashboard.
connect.cosmosdb.master.key: This is the authentication key used to access the database. Note: Azure often refers to this as the primary key, which can be confusing—the master.key in the connector configuration corresponds to Azure's primary key value.
Both properties are mandatory and must be set to establish a connection with CosmosDB.
Key Population Strategy
The connector offers flexible options for populating the id field of documents in CosmosDB. The behavior is controlled by two configurations:
connect.cosmosdb.key.source
Description: Defines the strategy used to extract or generate the document ID.
Valid Values:
Key (default): Use the Kafka record key.
connect.cosmosdb.key.path
Description: Specifies the field path to extract the key from, when using KeyPath or ValuePath.
Default: id
Example: If using ValuePath with connect.cosmosdb.key.path=id, the connector will use value.id
Cassandra
This page describes the usage of the Stream Reactor Cassandra Source Connector.
Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
Connector Class
Example
For more examples see the .
KCQL support
You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.
However, you can not route the same source to different topics, for this use a separate connector instance.
The following KCQL is supported:
Examples:
Keyed JSON Format
The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
Keying is only supported in conjunction with the WITHFORMAT JSON clause
Incremental mode
This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:
TIMESTAMP
TIMEUUID
TOKEN
DSESEARCHTIMESTAMP
If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.
You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.
DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.
Bulk Mode
The connector constantly loads the entire table.
Controlling throughput
The connector can be configured to:
Start from a particular offset - connect.cassandra.initial.offset
Increase or decrease the poll interval - connect.cassandra.import.poll.interval
Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration
For a more detailed explanation of how to use options.
name=elastic
connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true
INSERT | UPSERT
INTO <elastic_index >
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHDOCTYPE=<your_document_type>]
[WITHINDEXSUFFIX=<your_suffix>]
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')
WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
INSERT INTO index_name SELECT * FROM topicA
INSERT INTO _header.gate SELECT * FROM topicA
INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA
INSERT INTO _key SELECT * FROM topicA
INSERT INTO _value.name SELECT * FROM topicA
INSERT INTO _value.name.firstName SELECT * FROM topicA
INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
{
"customer.name": {
"first.name": "hans"
}
}
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS # Can also be PKCS12 if applicable
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS # Can also be PKCS12 if applicable
ssl.protocol=TLSv1.2 # Or TLSv1.3 for stronger security
ssl.trustmanager.algorithm=PKIX # Default algorithm for managing certificates
ssl.keymanager.algorithm=PKIX # Default algorithm for managing certificates
Name of the elastic search cluster, used in local mode for setting the connection
string
elasticsearch
connect.elastic.write.timeout
The time to wait in millis. Default is 5 minutes.
int
300000
connect.elastic.batch.size
How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once
int
4000
connect.elastic.use.http.username
Username if HTTP Basic Auth required default is null.
string
connect.elastic.use.http.password
Password if HTTP Basic Auth required default is null.
string
connect.elastic.error.policy
Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.elastic.max.retries
The maximum number of times to try the write again.
int
20
connect.elastic.retry.interval
The time in milliseconds between retries.
int
60000
connect.elastic.kcql
KCQL expression describing field selection and routes.
string
connect.elastic.pk.separator
Separator used when have more that one field in PK
string
-
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
ssl.protocol
The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.
ssl.trustmanager.algorithm
Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
ssl.keymanager.algorithm
Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
connect.cosmosdb.error.threshold (number of errors tolerated before failing)
connect.cosmosdb.proxy
Specifies the connection proxy details.
string
Database & Throughput
connect.cosmosdb.db
The Azure CosmosDB target database.
string
connect.cosmosdb.db.create
If set to true it will create the database if it doesn't exist. If this is set to default(false) an exception will be raised.
boolean
false
connect.cosmosdb.collection.throughput
The manual throughput to provision for new Cosmos DB collections (RU/s). The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.
int
400
Key & Partitioning
connect.cosmosdb.key.source
The source of the key. There are 4 possible values: Key, Metadata, KeyPath or ValuePath.
string
Key
connect.cosmosdb.key.path
When used with key.source configurations of KeyPath or ValuePath, this is the path to the field in the object that will be used as the key. Defaults to 'id'.
string
id
Write & Consistency
connect.cosmosdb.consistency.level
Determines the write visibility. There are four possible values: Strong, BoundedStaleness, Session or Eventual.
string
Session
connect.cosmosdb.bulk.enabled
Enable bulk mode to reduce chatter.
boolean
false
connect.cosmosdb.kcql
KCQL expression describing field selection and data routing to the target CosmosDb.
string
Error Handling & Retries
connect.cosmosdb.error.policy
Specifies the action to be taken if an error occurs while inserting the data. Options: NOOP (swallow error), THROW (propagate error), RETRY (retry message). The number of retries is based on the error.
string
THROW
connect.cosmosdb.max.retries
The maximum number of times to try the write again.
int
20
connect.cosmosdb.retry.interval
The time in milliseconds between retries.
int
60000
connect.cosmosdb.error.threshold
The number of errors to tolerate before failing the sink.
int
5
Queue & Performance
connect.cosmosdb.flush.count.enable
Flush on count can be disabled by setting this property to 'false'.
boolean
true
connect.cosmosdb.upload.sync.period
The time in milliseconds to wait before sending the request.
int
100
connect.cosmosdb.executor.threads
The number of threads to use for processing the records.
int
1
connect.cosmosdb.max.queue.size
The maximum number of records to queue per topic before blocking. If the queue limit is reached the connector will throw RetriableException and the connector settings to handle retries will be used.
int
1000000
connect.cosmosdb.max.queue.offer.timeout.ms
The maximum time in milliseconds to wait for the queue to accept a record. If the queue does not accept the record within this time, the connector will throw RetriableException and the connector settings to handle retries will be used.
int
120000
Monitoring & Progress
connect.progress.enabled
Enables the output for how many records have been processed.
boolean
false
Metadata: Use Kafka metadata (topic/partition/offset).
KeyPath: Extract from a field within the Kafka key.
ValuePath: Extract from a field within the Kafka value.
Display Name: Key strategy
as the document ID.
Authentication & Connection
connect.cosmosdb.endpoint
The Azure CosmosDB end point.
string
connect.cosmosdb.master.key
The connection master key.
password
flush.size
Specifies the size (in bytes) for the flush operation.
Long
(connector default)
flush.count
Specifies the number of records for the flush operation.
Int
(connector default)
flush.interval
Specifies the interval (in seconds) for the flush operation.
Password for the username to connect to Cassandra with.
password
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
boolean
false
connect.cassandra.trust.store.path
Path to the client Trust Store.
string
connect.cassandra.trust.store.password
Password for the client Trust Store.
password
connect.cassandra.trust.store.type
Type of the Trust Store, defaults to JKS
string
JKS
connect.cassandra.key.store.type
Type of the Key Store, defauts to JKS
string
JKS
connect.cassandra.ssl.client.cert.auth
Enable client certification authentication by Cassandra. Requires KeyStore options to be set.
boolean
false
connect.cassandra.key.store.path
Path to the client Key Store.
string
connect.cassandra.key.store.password
Password for the client Key Store
password
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.
string
connect.cassandra.fetch.size
The number of records the Cassandra driver will return at once.
int
5000
connect.cassandra.load.balancing.policy
Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN
string
TOKEN_AWARE
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
string
THROW
connect.cassandra.max.retries
The maximum number of times to try the write again.
int
20
connect.cassandra.retry.interval
The time in milliseconds between retries.
int
60000
connect.cassandra.task.buffer.size
The size of the queue as read writes to.
int
10000
connect.cassandra.assigned.tables
The tables a task has been assigned.
string
connect.cassandra.batch.size
The number of records the source task should drain from the reader queue.
int
100
connect.cassandra.import.poll.interval
The polling interval between queries against tables for bulk mode.
long
1000
connect.cassandra.time.slice.ms
The range of time in milliseconds the source task the timestamp/timeuuid will use for query
long
10000
connect.cassandra.import.allow.filtering
Enable ALLOW FILTERING in incremental selects.
boolean
true
connect.cassandra.slice.duration
Duration to query for in target Cassandra table. Used to restrict query timestamp span
long
10000
connect.cassandra.slice.delay.ms
The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available
long
30000
connect.cassandra.initial.offset
The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z
string
1900-01-01 00:00:00.0000000Z
connect.cassandra.mapping.collection.to.json
Mapping columns with type Map, List and Set like json
boolean
true
connect.cassandra.kcql
KCQL expression describing field selection and routes.
string
TimeUUID
Optional String
UUID
Optional String
Inet
Optional String
Ascii
Optional String
Text
Optional String
Timestamp
Optional String
connect.cassandra.contact.points
Initial contact point host for Cassandra including port.
name=cosmosdb
connector.class=io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
tasks.max=1
topics=orders-string
connect.cosmosdb.kcql=INSERT INTO orders SELECT * FROM orders-string
connect.cosmosdb.db=dm
connect.cosmosdb.endpoint=[YOUR_AZURE_ENDPOINT]
connect.cosmosdb.db.create=true
connect.cosmosdb.master.key=[YOUR_MASTER_KEY]
connect.cosmosdb.batch.size=10
INSERT | UPSERT
INTO <your-collection>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELDS,...]
-- Insert mode, select all fields from topicA
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA
-- UPSERT mode, select 3 fields and
-- rename from topicB and write to tableB
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id
connect.cosmosdb.bulk.enabled=true
connect.cosmosdb.collection.throughput=400
connect.cosmosdb.proxy=<proxy-uri>
connect.progress.enabled=true
INSERT INTO myCollection SELECT * FROM myTopic PROPERTIES('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
name=cassandra
connector.class=io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=cassandra
INSERT INTO <your-topic>
SELECT FIELD,...
FROM <your-cassandra-table>
[PK FIELD]
[WITHFORMAT JSON]
[INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN|DSESEARCHTIMESTAMP]
[WITHKEY(<your-key-field>)]
-- Select all columns from table orders and insert into a topic
-- called orders-topic, use column created to track new rows.
-- Incremental mode set to TIMEUUID
INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
-- Select created, product, price from table orders and insert
-- into a topic called orders-topic, use column created to track new rows.
INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.
INSERT INTO <topic>
SELECT a, b, c, d
FROM keyspace.table
WHERE solr_query= 'pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]}'
INCREMENTALMODE=DSESEARCHTIMESTAMP
HTTP
This page describes the usage of the Stream Reactor HTTP Sink Connector.
A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.
Features
Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).
URL, header and content templating ability give you full control of the HTTP request.
Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.
Connector Class
Example
For more examples see the .
Content Template
The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.
Static Templating
If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:
Single Message Templating
When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.
In your configuration, in the content property of your config, you can define template substitutions like the following example:
(please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)
Multiple Message Templating
To collapse multiple messages into a single HTTP request, you can use the multiple messaging template. This is automatic if the template has a messages tag. See the below example:
Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.
Your connector configuration will look like this:
The final result will be HTTP requests with bodies like this:
Available Keys
When using simple and multiple message templating, the following are available:
Field
Usage Example
URL Templating
URL including protocol (eg. http://lenses.io). Template variables can be used.
The URL is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.
Authentication Options
Currently, the HTTP Sink supports either no authentication, BASIC HTTP authentication and OAuth2 authentication.
No Authentication (Default)
By default, no authentication is set. This can be also done by providing a configuration like this:
BASIC HTTP Authentication
BASIC auth can be configured by providing a configuration like this:
OAuth2 Authentication
OAuth auth can be configured by providing a configuration like this:
Headers List
To customise the headers sent with your HTTP request you can supply a Headers List.
Each header key and value is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.
Example:
SSL Configuration
Enabling SSL connections between Kafka Connect and HTTP Endpoint ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity. Please check out section in order to set it up.
Batch Configuration
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It's worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven't reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the batchCount, batchSize and `timeInterval properties. The settings are optional and if not specified the defaults are:
Field
Default
Configuration Examples
Some configuration examples follow on how to apply this connector to different message types.
These include converters, which are required to instruct Kafka Connect on how to read the source content.
Static string template
In this case the converters are irrelevant as we are not using the message content to populate our message template.
Dynamic string template
The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.
Fields from the AVRO message are substituted into the message body in the following example:
Error/Success Reporter
Starting from version 8.1 as pilot release we give our customers ability to use functionality called Reporter which (if enabled) writes Success and Error processing reports to specified Kafka topic. Reports don't have key and you can find details about status in the message headers and value.
In order to enable this functionality we have to enable one (or both if we want full reporting) of the properties below:
These settings configure the Kafka producer for success and error reports. Full configuration options are available in the and sections. Three examples follow:
Plain Error Reporting
This is the most common scenario for on-premises Kafka Clusters used just for monitoring
Error Reporting using SASL
Using SASL provides a secure and standardized method for authenticating connections to an external Kafka cluster. It is especially valuable when connecting to clusters that require secure communication, as it supports mechanisms like SCRAM, GSSAPI (Kerberos), and OAuth, ensuring that only authorized clients can access the cluster. Additionally, SASL can help safeguard credentials during transmission, reducing the risk of unauthorized access.
Error Reporting using SSL
Using SSL ensures secure communication between clients and the Kafka cluster by encrypting data in transit. This prevents unauthorized parties from intercepting or tampering with sensitive information. SSL also supports mutual authentication, allowing both the client and server to verify each other’s identities, which enhances trust and security in the connection.
Options
Configuration parameters
This sink connector supports the following options as part of its configuration:
Field
Type
Required
Values (Default)
SSL Configuration Properties
Error Reporter Properties
The error reporter can also be configured with SSL Properties. See the section . In this case all properties should be prefixed with connect.reporting.error.config to ensure they apply to the error reporter.
Success Reporter Properties
The error reporter can also be configured with SSL Properties. See the section . In this case all properties should be prefixed with connect.reporting.success.config to ensure they apply to the success reporter.
AWS S3
This page describes the usage of the Stream Reactor AWS S3 Source Connector.
This connector is also available on the AWS Marketplace.
Objects that have been archived to AWS Glacier storage class are skipped, in order to load these objects you must manually restore the objects. Skipped objects are logged in the Connect workers log files.
Google BigQuery
The Google BigQuery sink connector is an open-source connector imported from Confluent (originally developed by WePay) that enables you to export data from Apache Kafka® topics to Google BigQuery tables.
Overview
The BigQuery sink connector allows you to:
Partition
{{partition}}
Offset
{{offset}}
Timestamp
{{timestamp}}
connect.http.authentication.type
Authentication
No
(none)
connect.http.request.headers
List[String]
No
connect.http.batch.count
Int
No
The number of records to batch before sending the request, see
connect.http.batch.size
Int
No
The size of the batch in bytes before sending the request, see
connect.http.time.interval
Int
No
The time interval in milliseconds to wait before sending the request
connect.http.upload.sync.period
Int
No
Upload Sync Period (100) - polling time period for uploads in milliseconds
connect.http.error.threshold
Int
No
The number of errors to tolerate before failing the sink (5)
connect.http.retry.mode
String
No
The http retry mode. It can be one of : Fixed or Exponential(default)
connect.http.retries.on.status.codes
List[String]
No
The status codes to retry on (default codes are : 408,429,500,502,5003,504)
connect.http.retries.max.retries
Int
No
The maximum number of retries to attempt (default is 5)
connect.http.retry.fixed.interval.ms
Int
No
The set duration to wait before retrying HTTP requests. The default is 10000 (10 seconds)
connect.http.retries.max.timeout.ms
Int
No
The maximum time in milliseconds to retry a request when Exponential retry is set. Backoff is used to increase the time between retries, up to this maximum (30000)
connect.http.connection.timeout.ms
Int
No
The HTTP connection timeout in milliseconds (10000)
connect.http.max.queue.size
int
int
For each processed topic, the connector maintains an internal queue. This value specifies the maximum number of entries allowed in the queue before the enqueue operation blocks. The default is 100,000.
connect.http.max.queue.offer.timeout.ms
int
int
The maximum time window, specified in milliseconds, to wait for the internal queue to accept new records. The d
The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLSv1.3.
ssl.keymanager.algorithm
Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
ssl.trustmanager.algorithm
Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The connector does not support multiple KCQL statements that reference the same source location; to use multiple statements, configure each one in a separate connector instance.
Source Bucket & Path
The S3 source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.
connect.s3.source.partition.extractor.regex=none
connect.s3.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric.
Target Bucket & Path
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
S3 Object formats
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read objects containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.
Text: The connector will read objects containing lines of text, each line representing a distinct record.
CSV: The connector will read objects containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.
Use the STOREAS clause to configure the storage format. The following options are available:
Text Processing
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
Regex
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
Start-End line
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
Start-End tag
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Storage output matrix
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
S3 Storage Format
Kafka Output Format
Restore or replicate cluster
Analytics
Sink Converter
Source Converter
JSON
STRING
Same,Other
Yes, No
StringConverter
StringConverter
AVRO,Parquet
STRING
Projections
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
Ordering
The S3 sink employs zero-padding in object names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of object.
When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical object key name order.
In such cases, to process object in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.
If using LastModified sorting, ensure objects do not arrive late, or use a post-processing step to handle them.
Throttling
To limit the number of object keys the source reads from S3 in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
Object Extension Filtering
The AWS S3 Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes and connect.s3.source.extension.includes.
Excluding Object Extensions
The connect.s3.source.extension.excludes property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt and .csv objects, you would set this property as follows:
Including Object Extensions
The connect.s3.source.extension.includes property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json and .xml objects, you would set this property as follows:
Note: If both connect.s3.source.extension.excludes and connect.s3.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.
Post-Processing Options
Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving files to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.
Use Cases for Post-Processing Options
Deleting Objects After Processing
For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.
Example:
Result: Objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.
Moving Objects to an Archive Bucket
To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.
Example:
Result: Objects are transferred to an archive-bucket, stored with an updated path that includes the processed/ prefix, maintaining an organized archive structure.
Properties
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
Name
Description
Type
Available Values
read.text.mode
Controls how Text content is read
Enum
Regex, StartEndTag, StartEndLine
read.text.regex
Regular Expression for Text Reading (if applicable)
String
read.text.start.tag
Start Tag for Text Reading (if applicable)
String
Authentication
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
API Compatible systems
The connector can also be used against API compatible systems provided they implement the following:
Option Reference
Name
Description
Type
Available Values
Default Value
connect.s3.aws.auth.mode
Specifies the AWS authentication mode for connecting to S3.
string
"Credentials," "Default"
"Default"
connect.s3.aws.access.key
Access Key for AWS S3 Credentials
string
Stream data from Kafka topics to BigQuery tables
Automatically create tables based on topic data
Configure data delivery semantics (at-least-once or exactly-once)
Perform schema evolution when topic schemas change
Prerequisites
Before using the BigQuery sink connector, ensure you have:
A Google Cloud Platform (GCP) account
A BigQuery project with appropriate permissions
Service account credentials with access to BigQuery
Kafka topics with data to be exported
Configuration
Basic Configuration
Here's a basic configuration for the BigQuery sink connector:
Features of Google BigQuery Sink Connector
Multiple tasks support: Configure using tasks.max parameter for performance optimization when parsing multiple files
InsertAll API features: Supports insert operations with built-in duplicate detection capabilities
Real-time streaming: Records are inserted one at a time and available immediately for querying
Multi-topic support: Can stream from multiple topics to corresponding BigQuery tables
Parallel processing: Uses an internal thread pool (default: 10 threads, configurable) for scalable record streaming
Important Configuration Properties
Property
Description
Type
Default
Importance
defaultDataset
The default dataset to be used. Replaced the datasets parameter of older versions of this connector.
string
-
high
project
The BigQuery project to write to.
string
-
high
Data Mapping
Data Type Conversions
The connector maps Kafka Connect schema types to BigQuery data types as follows:
BigQuery Data Type
Connector Mapping
STRING
String
INTEGER
INT8
INTEGER
INT16
INTEGER
INT32
INTEGER
INT64
FLOAT
FLOAT32
Schema Evolution
When schema evolution is enabled (using allowNewBigQueryFields, allowBigQueryRequiredFieldRelaxation, and allowSchemaUnionization), the connector can handle schema changes:
New fields added to the Kafka topic can be added to the BigQuery table
Field constraints can be relaxed from REQUIRED to NULLABLE
Schemas can be unionized when records in the same batch have different schemas
Usage Examples
Basic Example
Example with Batch Loading
Example with Upsert Functionality
Troubleshooting
Common Issues
Authentication errors: Ensure your service account key file is correct and has appropriate permissions.
Schema compatibility issues: When schema updates are enabled, existing data might not be compatible with new schemas.
Quota limitations: BigQuery has quotas for API requests; consider adjusting threadPoolSize and queueSize.
Table creation failures: Ensure autoCreateTables is only used with Schema Registry-based inputs (Avro, Protobuf, or JSON Schema).
Performance issues: For high-volume data, consider using batch loading via GCS instead of streaming inserts.
Logging
To enable detailed logging for troubleshooting:
Limitations
The BigQuery Sink connector has the following limitations:
The connector does not support schemas with recursion.
The connector does not support schemas having float fields with NaN or +Infinity values.
Auto schema update does not support removing columns.
Auto schema update does not support recursive schemas.
When the connector is configured with upsertEnabled or deleteEnabled, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
io.debezium.transforms.ByLogicalTableRouter
Upgrading to 2.x.x
The following changes aren’t backward compatible in the BigQuery connector:
datasets was removed and defaultDataset has been introduced. The connector now infers the dataset from the topic name if the topic is in the form <dataset>:<tableName>. If the topic name is in the form <tablename>, the connector defaults to defaultDataset.
topicsToTables was removed. You should use SMT RegexRouter to route topics to tables.
autoUpdateSchemas was replaced by allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation.
value.converter.enhanced.avro.schema.support should be set to false or removed. If this property is not removed or set to false, you may receive the following error:
GCP Storage
This page describes the usage of the Stream Reactor GCP Storage Source Connector.
Connector Class
Example
For more examples see the .
KCQL Support
You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.
However, you can not route the same source to different topics, for this use a separate connector instance.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
Source Bucket & Path
The GCP Storage source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
GCP Storage object formats
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read objects containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
Use the STOREAS clause to configure the storage format. The following options are available:
Text Processing
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
Regex
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
Start-End line
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
Start-End tag
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Storage output matrix
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
S3 Storage Format
Kafka Output Format
Restore or replicate cluster
Analytics
Sink Converter
Source Converter
Projections
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
Ordering
[^3]s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of objects.
When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical object name order.
In such cases, to process objects in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.
If using LastModified sorting, ensure objects do not arrive late, or use a post-processing step to handle them.
Throttling
To limit the number of object names the source reads from GCS in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
Object Extension Filtering
The GCP Storage Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes.
Excluding Object Extensions
The connect.gcpstorage.source.extension.excludes property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt and .csv objects, you would set this property as follows:
Including Object Extensions
The connect.gcpstorage.source.extension.includes property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json and .xml objects, you would set this property as follows:
Note: If both connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.
Post-Processing Options
Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving objects to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.
Use Cases for Post-Processing Options
Deleting objects After Processing
For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.
Example:
Result: objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.
Moving objects to an Archive Bucket
Properties
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
Name
Description
Type
Available Values
Authentication
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.
When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “File” mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
Backup and Restore
When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:
When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.
Partition Extraction
When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.
To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type property, which supports two options:
hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.
regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:
Option Reference
Name
Description
Type
Available Values
Default Value
Cassandra
This page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.
Stream Reactor Cassandra Sink Connector is designed to move data from Apache Kafka to Apache Cassandra . It supports Apache Cssandra 3.0 and later, DataStax Enterprise 4.7 and later, and DataStax Astra databases.
Input formats: The connector supports Avro, Json Schema, Protobuf, Json (schemaless) and primitive data formats. For Schema-Registry based formats, like AVRO, Protobuf, Json-schema, it needs the the Scheam Registry settings.
At least once semantics : The Connect framework stores record offsets in Kafka and resumes from the last committed offset on restart. This ensures reliable delivery but may occasionally result in duplicate record processing during failures
Example
For more examples see the .
Connection
The connector requires a connection to the database. To enable it, specify the following configuration entries. See Configuration Reference for details.
Security
SSL connection
When the database cluster has enabled client encryption, configure the SSL Keys and certificates:
User-Password or LDAP authentication
When the database is configured with user-password of LDAP authentication, configure the following:
Kerberos authentication
When the database cluster has Kerberos authentication enabled set the following settings:
KCQL support
You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
KCQL (Kafka Connect Query Language) is a SQL-like syntax that provides a declarative way to configure connector behavior. It serves as the primary interface for defining how data flows from source topics to target Cassandra tables.
Purpose and Functionality
KCQL statements define three core aspects of data pipeline configuration:
Data Source and Target: Specifies which Kafka topic serves as the data source and which Cassandra table receives the data.
Field Mapping: Controls how fields from the source topic map to columns in the target table, including transformations and selective field inclusion.
Operational Parameters: Configures advanced settings such as consistency levels, delete operations, time-to-live (TTL) settings, and timestamp handling.
Key Capabilities
Flexible Field Mapping: Map source fields to target columns with optional transformations
Consistency Control: Set read and write consistency levels for Cassandra operations
Delete Operations: Enable or disable delete functionality based on message content
Syntax Template
The basic KCQL statement follows this structure:
INSERT INTO: Specifies the target Cassandra keyspace and table where data will be written.
SELECT: Defines field projection and mapping from the source.
FROM: Identifies the source Kafka topic containing the data to be processed.
PROPERTIES: Optional clause for configuring connector behavior such as consistency levels, TTL settings, and operational parameters.
Examples
Table mapping
Kafka to Database Table Mapping
The connector enables mapping fields from Kafka records to database table columns. A single connector can handle multiple topics, where each topic can map to one or more tables. Kafka messages consist of a Key, Value, and Header.
Generic Mapping Syntax
Use the following syntax to map Kafka message fields to database columns:
Prefix the field with:
_value: For fields from the Kafka message's Value component.
_key: For fields from the Kafka message's Key component.
Note: If no prefix is provided, _value is assumed, mapping to the Kafka record's Value component.
The record Key and Value can be mapped to specific columns like this, considering a table with row_key and content columns:
To map database columns with whitespace to KCQL projections, follow this format:
Fan-out
You can map a topic's data to multiple database tables using multiple KCQL statements in the same connect.cassandra.kcql configuration. For example:
User defined types
The connector can map complex types from a Kafka record payload into a user-defined type (UDT) column in the database. The incoming data field names must match the database field names.
Consider the following Kafka record format:
key
value
and database definition:
The mapping configuration should be:
Using now() function
You can leverage the now() which returns TIMEUUID function in the mapping. Here is how to use it in KCQL projection:
Write Timestamp
To specify an internal write-time timestamp from the database, choose a numeric field from the Kafka record payload. Use the following mapping syntax:
Examples:
Row-Level TTL
To define the time-to-live (TTL) for a database record, you can optionally map a field from the Kafka record payload. Use the following query:
Examples:
These examples demonstrate specifying TTL units such as 'MINUTES', 'HOURS', or 'DAYS', in addition to the default 'SECONDS'.
Converting date and time for a topic
To configure data and time conversion properties for a topic, use the following SQL command:
Codec Parameter Descriptions:
Parameter
Description
Default
CQL Queries
Advanced CQL Query Configuration for Kafka Records
When a new Kafka record arrives, you have the option to run a custom CQL query. This feature is designed for advanced use cases; typically, the standard Kafka mapping suffices without needing a query. If you specify a query in the topic-to-table mapping, it will take precedence over the default action.
Important: You must include the bound variables used in the query within the mapping column.
Example
Extra settings
Parameter
Description
Default
Java Driver settings
Using the connector configuration, prefix with
any specific setting you would want to set for the Cassandra client. Refer to the for more information.
Create BigQuery tables if they don't already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input.
boolean
false
high
gcsBucketName
The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.
string
""
high
queueSize
The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.
long
-1
high
bigQueryMessageTimePartitioning
Whether or not to use the message time when inserting records. Default uses the connector processing time.
boolean
false
high
bigQueryPartitionDecorator
Whether or not to append partition decorator to BigQuery table name when inserting records. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.
boolean
true
high
keySource
Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE and JSON. This property is available in BigQuery sink connector version 1.3 (and later).
string
FILE
medium
keyfile
Keyfile can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).
string
null
medium
bigQueryRetry
The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.
int
0
medium
bigQueryRetryWait
The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.
long
1000
medium
sanitizeTopics
Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.
boolean
false
medium
schemaRetriever
A class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, the SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic.
The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.
int
10
medium
autoCreateBucket
Whether to automatically create the given bucket, if it does not exist.
boolean
true
medium
allowNewBigQueryFields
If true, new fields can be added to BigQuery tables during subsequent schema updates.
boolean
false
medium
allowBigQueryRequiredFieldRelaxation
If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE. Note that allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation replaced the autoUpdateSchemas parameter of older versions of this connector.
boolean
false
medium
allowSchemaUnionization
If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts. Note that setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older versions.
boolean
false
medium
auto.register.schemas
Specifies if the Serializer should attempt to register the Schema with Schema Registry.
boolean
true
medium
use.latest.version
Only applies when auto.register.schemas is set to false. If use.latest.version is set to true, then Schema Registry uses the latest version of the schema in the subject for serialization.
boolean
true
medium
timestampPartitionFieldName
The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave blank to enable ingestion time partitioning for each table.
string
null
low
clusteringPartitionFieldNames
Comma-separated list of fields where data is clustered in BigQuery.
list
null
low
timePartitioningType
The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type.
string
DAY
low
allBQFieldsNullable
If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).
boolean
false
low
avroDataCacheSize
The size of the cache to use when converting schemas from Avro to Kafka Connect.
int
100
low
batchLoadIntervalSec
The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.
int
120
low
convertDoubleSpecialValues
Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery.
boolean
false
low
enableBatchLoad
Beta Feature - Use with caution. The sublist of topics to be batch loaded through GCS.
list
""
low
includeKafkaData
Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.
boolean
false
low
upsertEnabled
Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won't work with SMTs that change the name of the topic and doesn't support JSON input.
boolean
false
low
deleteEnabled
Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic and doesn't support JSON input.
boolean
false
low
intermediateTableSuffix
A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table.
string
"tmp"
low
mergeIntervalMs
How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing.
long
60000
low
mergeRecordsThreshold
How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing.
long
-1
low
kafkaDataFieldName
The Kafka data field name. The default value is null, which means the Kafka Data field will not be included.
string
null
low
kafkaKeyFieldName
The Kafka key field name. The default value is null, which means the Kafka Key field will not be included.
string
null
low
topic2TableMap
Map of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,... Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set.
string
""
low
csfle.enabled
CSFLE is enabled for the connector if set to True.
boolean
False
low
FLOAT
FLOAT64
BOOLEAN
Boolean
BYTES
Bytes
TIMESTAMP
Logical TIMESTAMP
TIME
Logical TIME
DATE
Logical DATE
FLOAT
Logical Decimal
DATE
Debezium Date
TIME
Debezium MicroTime
TIME
Debezium Time
TIMESTAMP
Debezium MicroTimestamp
TIMESTAMP
Debezium TIMESTAMP
TIMESTAMP
Debezium ZonedTimestamp
To load in alpha numeric order set the ordering type to AlphaNumeric.
Text: The connector will read objects containing lines of text, each line representing a distinct record.
CSV: The connector will read objects containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.
Yes
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
No
ByteArrayConverter
ByteArrayConverter
JSON
JSON
Same,Other
Yes
JsonConverter
StringConverter
JSON
JSON
Same,Other
No
StringConverter
StringConverter
AVRO,Parquet
JSON
Same,Other
Yes,No
JsonConverter
JsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSON
BYTES
Same,Other
Yes,No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Same
Yes
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
AVRO
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Other
Yes,No
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
Yes
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
Protobuf
Other
Yes,No
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSON
Other
Same, Other
Yes,No
ByteArrayConverter
ByteArrayConverter
To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.
Example:
Result: objects are transferred to an archive-bucket, stored with an updated path that includes the processed/ prefix, maintaining an organized archive structure.
read.text.end.tag
End Tag for Text Reading (if applicable)
String
read.text.buffer.size
Text Buffer Size (for optimization)
Int
read.text.start.line
Start Line for Text Reading (if applicable)
String
read.text.end.line
End Line for Text Reading (if applicable)
String
read.text.trim
Trim Text During Reading
Boolean
store.envelope
Messages are stored as “Envelope”
Boolean
post.process.action
Defines the action to perform on source objects after successful processing.
Enum
DELETE or MOVE
post.process.action.bucket
Specifies the target bucket for the MOVE action (required for MOVE).
String
post.process.action.prefix
Specifies a new prefix for the object’s location when using the MOVE action (required for MOVE).
Enable this feature to periodically scan the specified S3 bucket and path for files with timestamps older than the current watermark. If such files are detected, it performs a "touch" operation to update their last modified timestamp, allowing the connector to process them. Use this option only when there is a possibility of "late arrival" of files, which occurs when the sequence of object creation by producers is not guaranteed.
Boolean
false
For "auth.mode" file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.gcpstorage.gcp.project.id
GCP Project ID.
string
(Empty)
connect.gcpstorage.gcp.quota.project.id
GCP Quota Project ID.
string
(Empty)
connect.gcpstorage.endpoint
Endpoint for GCP Storage.
string
connect.gcpstorage.error.policy
Defines the error handling policy when errors occur during data transfer to or from GCP Storage.
string
"NOOP," "THROW," "RETRY"
"THROW"
connect.gcpstorage.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.gcpstorage.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.gcpstorage.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.
long
5
connect.gcpstorage.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.gcpstorage.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
Invalid field name
"com.examples.project-super-important.v1.MyData". Fields must
contain only letters, numbers, and underscores, start with a letter or
underscore, and be at most 300 characters long.
name=gcp-storageSourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=1
connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
INSERT INTO $kafka-topic
SELECT *
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]
INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix
FROM [bucketname]:pathprefix
//my-bucket-called-pears:my-folder-called-apples
INSERT INTO my-apples-topic SELECT * FROM my-bucket-called-pears:my-folder-called-apples
{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Value, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}
INSERT INTO `my-topic`
SELECT * FROM `my-gcp-storage-bucket:my-prefix`
PROPERTIES (
'post.process.action'=`MOVE`,
'post.process.action.bucket'=`archive-bucket`,
'post.process.action.prefix'=`processed/`
)
Kafka topic to database's table mapping: It allows the control of which Kafka message fields are written to the database table columns
Multi-table mapping: Enables writing a topic to multiple database tables
Date/Time/Timestamp formats: Allows control of the date, time, and timestamp format conversion between Kafka messages and database columns, supporting custom parsing and formatting patterns.
Consistency Level control: Allows configuring the write consistency on a per table mapping
Row Level Time-to-Live: Allows configuring the row-level TTL on a per table mapping
Deletes: Allows configuring the deletes on a per table mapping
TTL Management: Configure automatic data expiration using time-to-live settings
Timestamp Handling: Control how timestamps are processed and stored
Conditional Logic: Apply filtering and conditional processing to incoming data
_header: For fields from the Kafka message's Header.
codec.timeZone
Defines the time zone for conversions without an explicit time zone.
UTC
codec.locale
Specifies the locale for locale-sensitive conversions.
en_US
connect.cassandra.connection.pool.size
The number of connections to maintain in the connection pool.
2
String
connect.cassandra.compression
Compression algorithm to use for the connection. Defaults to LZ4.
LZ4
String
connect.cassandra.query.timeout.ms
The Cassandra driver query timeout in milliseconds.
20000
Int
connect.cassandra.max.batch.size
Number of records to include in a write request to the database table.
64
Int
connect.cassandra.load.balancing.local.dc
The case-sensitive datacenter name for the driver to use for load balancing.
(no default)
String
connect.cassandra.auth.provider
Authentication provider
None
String
connect.cassandra.auth.username
Username for PLAIN (username/password) provider authentication
""
String
connect.cassandra.auth.password
Password for PLAIN (username/password) provider authentication
""
String
connect.cassandra.auth.gssapi.keytab
Kerberos keytab file for GSSAPI provider authentication
""
String
connect.cassandra.auth.gssapi.principal
Kerberos principal for GSSAPI provider authentication
""
String
connect.cassandra.auth.gssapi.service
SASL service name to use for GSSAPI provider authentication
dse
String
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
false
String
connect.cassandra.ssl.provider
The SSL provider to use for the connection. Available values are None, JDK or OpenSSL. Defaults to None.
None
String
connect.cassandra.ssl.truststore.path
Path to the client Trust Store.
(no default)
String
connect.cassandra.ssl.truststore.password
Password for the client Trust Store.
(no default)
String
connect.cassandra.ssl.keystore.path
Path to the client Key Store.
(no default)
String
connect.cassandra.ssl.keystore.password
Password for the client Key Store
(no default)
String
connect.cassandra.ssl.cipher.suites
The SSL cipher suites to use for the connection.
(no default)
String
connect.cassandra.ssl.hostname.verification
Enable hostname verification for the connection.
true
String
connect.cassandra.ssl.openssl.key.cert.chain
Enable OpenSSL key certificate chain for the connection.
(no default)
String
connect.cassandra.ssl.openssl.private.key
Enable OpenSSL private key for the connection.
(no default)
String
connect.cassandra.ignore.errors.mode
Can be one of 'none', 'all' or 'driver'
none
String
connect.cassandra.retry.interval
The time in milliseconds between retries.
60000
String
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed; THROW - the error is allowed to propagate; RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
THROW
String
connect.cassandra.max.retries
The maximum number of times to try the write again.
20
Int
connect.cassandra.kcql
KCQL expression describing field selection and routes.
(no default)
String
connect.cassandra.progress.enabled
Enables the output for how many records have been processed
false
Boolean
Tweaks the Cassandra driver settings.
Refer to the
APPLE
codec.timestamp
Defines the pattern for converting strings to CQL timestamps. Options include specific date-time patterns like yyyy-MM-dd HH:mm:ss or pre-defined formats such as ISO_ZONED_DATE_TIME and ISO_INSTANT. The special formatter CQL_TIMESTAMP supports all CQL timestamp formats.
CQL_TIMESTAMP
codec.date
Specifies the pattern for converting strings to CQL dates. Options include date-time patterns like yyyy-MM-dd and formatters such as ISO_LOCAL_DATE.
ISO_LOCAL_DATE
codec.time
Sets the pattern for converting strings to CQL time, using patterns like HH:mm:ss or formatters such as ISO_LOCAL_TIME.
ISO_LOCAL_TIME
codec.unit
For digit-only inputs not parsed by codec.timestamp, this sets the time unit for conversion. Accepts all TimeUnit enum constants.
consitencyLevel
Query Consistency Level Options:
ALL
EACH_QUORUM
QUORUM
LOCAL_QUORUM
ONE
TWO
THREE
LOCAL_ONE
ANY
CQL_TIMESTAMP
ttl
Specify the number of seconds before data is automatically deleted from the DSE table. When set, all rows in the topic table will share this TTL value.
-1
nullToUnset
When handling nulls in Kafka, it's advisable to treat them as UNSET in DSE. DataStax suggests sticking with the default setting to minimize the creation of unnecessary tombstones.
true
deletesEnabled
Enable this feature to treat records as deletes if, after mapping, only the primary key columns have non-null values. This prevents the insertion or update of nulls in regular columns.
connect.cassandra.contact.points
A comma-separated list of host names or IP addresses
localhost
String
connect.cassandra.port
Cassandra native port.
9042
String
connect.cassandra.max.concurrent.requests
Maximum number of requests to send to database at the same time.
Enable this feature to periodically scan the specified S3 bucket and path for files with timestamps older than the current watermark. If such files are detected, it performs a "touch" operation to update their last modified timestamp, allowing the connector to process them. Use this option only when there is a possibility of "late arrival" of files, which occurs when the sequence of object creation by producers is not guaranteed.
Boolean
false
connect.s3.aws.secret.key
Secret Key for AWS S3 Credentials
string
connect.s3.aws.region
AWS Region for S3 Bucket
string
connect.s3.pool.max.connections
Maximum Connections in the Connection Pool
int
-1 (undefined)
50
connect.s3.custom.endpoint
Custom Endpoint URL for S3 (if applicable)
string
connect.s3.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
Multiplier to apply to the delay when retrying when no results are found.
double
2.0 Multiplier (x)
connect.s3.source.write.watermark.header
Write the record with kafka headers including details of the source and line number of the file.
boolean
true, false
false
connect.s3.source.late.arrival.interval
Specify the delay interval, in seconds, for processing objects or files older than the connector watermark.
Available since version 11.3.0.
Int
300
Simple configuration
name=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
connect.cassandra.port=9042
connect.cassandra.contact.points=cassandra
connect.cassandra.ssl.provider=
connect.cassandra.ssl.cipher.suites=
connect.cassandra.ssl.hostname.verification=true
connect.cassandra.ssl.keystore.path=
connect.cassandra.ssl.keystore.password=
connect.cassandra.ssl.truststore.password=
connect.cassandra.ssl.truststore.path=
# Path to the SSL certificate file, when using OpenSSL.
connect.cassandra.ssl.openssl.key.cert.chain=
# Path to the private key file, when using OpenSSL.
connect.cassandra.ssl.openssl.private.key=
connect.cassandra.auth.provider=
connect.cassandra.auth.username=
connect.cassandra.auth.password=
# for SASL authentication which requires connect.cassandra.auth.provider
# set to GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=
INSERT INTO <keyspace><your-cassandra-table>
SELECT <field projection,...
FROM <your-table>
PROPERTIES(< a set of keys to control the connector behaviour>)
INSERT INTO mykeyspace.types
SELECT
_key.bigint as bigintcol
, _value.boolean as booleancol
, _key.double as doublecol
, _value.float as floatcol
, _header.int as intcol
FROM myTopic
INSERT INTO mykespace.types
SELECT
_value.bigint as bigintcol
, _value.double as doublecol
, _value.ttlcol as message_internal_ttl
, _value.timestampcol as message_internal_timestamp
FROM myTopic
PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
INSERT INTO mykeyspace.CASE_SENSITIVE
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM mytopic
INSERT INTO mykeyspace.tableA
SELECT
key.my_pk as my_pk
, _value.my_value as my_value
FROM topicA
PROPERTIES(
'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
'deletesEnabled' ='true'
)
INSERT INTO ${target}
SELECT _value/_key/_header.{field_path} AS ${table_column}
FROM mytopic
INSERT INTO ${target}
SELECT
_key as row_key
, _value as content
INSERT INTO ${target}
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM myTopic
INSERT INTO stocks_keyspace.stocks_by_symbol
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;
INSERT INTO stocks_keyspace.stocks_by_exchange
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;
CREATE TYPE stocks_keyspace.stocks_type (
symbol text,
ts timestamp,
exchange text,
value double
);
CREATE TABLE stocks_keyspace.stocks_table (
name text PRIMARY KEY,
stocks FROZEN<stocks_type>
);
INSERT INTO stocks_keyspace.stocks_table
SELECT _key AS name, _value AS stocks
FROM stocks
INSERT INTO ${target}
SELECT
`now()` as loaded_at
FROM myTopic
SELECT [_value/_key/_header].{path} AS message_internal_timestamp
FROM myTopic
--You may optionally specify the time unit by using:
PROPERTIES('timestampTimeUnit'='MICROSECONDS')
SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
SELECT _key.timestampcol AS message_internal_timestamp FROM myTopic
SELECT [_value/_key/_header].{path} as message_internal_ttl
FROM myTopic
-- Optional: Specify the TTL unit
PROPERTIES('ttlTimeUnit'='SECONDS')
SELECT _header.ttl as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='MINUTES')
SELECT _key.expiry as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='HOURS')
SELECT _value.ttl_duration as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='DAYS')
INSERT INTO ${target}
SELECT
_value.bigint as some_name,
_value.int as some_name2
FROM myTopic
PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')
connect.cassandra.driver.{driver_setting}
INSERT INTO `my-topic`
SELECT * FROM `my-s3-bucket:my-prefix`
PROPERTIES (
'post.process.action'=`MOVE`,
'post.process.action.bucket'=`archive-bucket`,
'post.process.action.prefix'=`processed/`
)
This page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.
This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to Azure Data Lake Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.
Connector Class
Example
For more examples see the .
KCQL Support
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The connector uses KCQL to map topics to Datalake buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .:
In this case, you can use the following KCQL statement:
Target Bucket and Path
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
SQL Projection
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
Source Topic
To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:
In this case the topic name will be appended to the $target destination.
KCQL Properties
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
Name
Description
Type
Available Values
Default Value
The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the Datalake Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the Datalake Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.
Partitioning & File names
The object key serves as the filename used to store data in Datalake. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.
The partition clause works on Header, Key and Values fields of the Kafka message.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of Azure Datalake key length restrictions.
To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the Datalake object key name:
Kafka message headers can also be used in the Datalake object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure Datalake object keys effectively.
To enable Athena-like partitioning, use the following syn
Rolling Windows
Storing data in Azure Datalake and partitioning it by time is a common practice in data management. For instance, you may want to organize your Datalake data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation .
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
Data Storage Format
While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within Azure Datalake. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in Datalake. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Not supported with a custom partition strategy.
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in Datalake is desired.
Examples
Storing the message Value Avro data as Parquet in Datalake:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in Datalake.
Enabling the full message stored as JSON in Datalake:
Enabling the full message stored as AVRO in Datalake:
If the restore (see the Datalake Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
Flush Options
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see section). The settings are optional and if not specified the defaults are:
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
When connect.datalake.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:
Flushing By Interval
The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to Data Lake). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.
Compression
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.
Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:
Compression
Avro Support
Avro (requires Level)
Parquet Support
JSON
Please note that not all compression libraries are bundled with the Datalake connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
Authentication
The connector offers two distinct authentication modes:
Default: This mode relies on the default Azure authentication chain, simplifying the authentication process.
Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with Azure.
Credentials: In this mode, explicit configuration of Azure Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “Connection String” mode:
For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers.
Error policies
The connector supports .
Indexes Directory
The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.
By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes directory.
You can configure the root directory for these index files using the property connect.datalake.indexes.name. This property specifies the path from the root of the data lake filesystem. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.
Examples
Option Reference
Name
Description
Type
Available Values
Default Value
GCP Storage
This page describes the usage of the Stream Reactor GCP Storage Sink Connector.
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
AWS S3
This page describes the usage of the Stream Reactor AWS S3 Sink Connector.
This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to AWS S3 Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.
Sets the compression level when compression is enabled for data transfer to Datalake.
int
1-9
(Empty)
connect.datalake.seek.max.files
Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.
int
5
connect.datalake.indexes.name
Configure the indexes root directory for this connector.
string
".indexes"
connect.datalake.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.datalake.schema.change.detector
Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).
string
default, version, compatibility
default
connect.datalake.skip.null.values
Skip records with null values (a.k.a. tombstone records).
When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.
boolean
true,false
false
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
UNCOMPRESSED
✅
✅
✅
SNAPPY
✅
✅
Index Name (connect.datalake.indexes.name)
Resulting Indexes Directory Structure
Description
.indexes (default)
.indexes/<connector_name>/
The default setup, where each connector uses its own subdirectory within .indexes.
custom-indexes
custom-indexes/<connector_name>/
Custom root directory custom-indexes, with a subdirectory for each connector.
indexes/datalake-connector-logs
indexes/datalake-connector-logs/<connector_name>/
Uses a custom subdirectory datalake-connector-logs within indexes, with a subdirectory for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes, with a subdirectory for each connector.
connect.datalake.azure.auth.mode
Specifies the Azure authentication mode for connecting to Datalake.
INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]
{
...
"a.b": "value",
...
}
INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
topics.regex = ^sensor_data_\d+$
connect.datalake.kcql= INSERT INTO $target SELECT * FROM `*` ....
{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}
...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...
pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)
The connector uses KCQL to map topics to GCP Storage buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as Json can use fields contaiing .:
In this case you can use the following KCQL statement:
Target Bucket and Path
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
SQL Projection
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
Source Topic
To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:
In this case the topic name will be appended to the $target destination.
KCQL Properties
The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
Name
Description
Type
Available Values
Default Value
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
The sink connector optimizes performance by padding the output object names, a practice that proves beneficial when using the GCP Storage Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the GCP Storage Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.
Partitioning & Object Keys
The object key serves as the filename used to store data in GCP Storage. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (GCP Athena naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.
The partition clause works on header, key and values fields of the Kafka message.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of GCP Storage key length restrictions.
To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the GCP Storage object key name:
Kafka message headers can also be used in the GCP Storage object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure GCP Storage object keys effectively.
To enable Athena-like partitioning, use the following syntax:
Rolling Windows
Storing data in GCP Storage and partitioning it by time is a common practice in data management. For instance, you may want to organize your GCP Storage data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation here.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
Data Storage Format
While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within GCP Storage. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate object. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in GCP Storage. For cases where you prefer to consolidate multiple records into a single binary object, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Not supported with a custom partition strategy.
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in GCP Storage is desired.
Examples
Storing the message Value Avro data as Parquet in GCP Storage:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in GCP Storage.
Enabling the full message stored as JSON in GCP Storage:
Enabling the full message stored as AVRO in GCP Storage:
If the restore (see the GCP Storage Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
Flush Options
The connector offers three distinct flush options for data management:
Flush by Count - triggers an object flush after a specified number of records have been written to it.
Flush by Size - initiates an object flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces an object flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that objects are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see KCQL Propertiessection). The settings are optional and if not specified the defaults are:
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
When connect.gcpstorage.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:
Flushing By Interval
The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to GCP Storage). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.
sink commit.png
Compression
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.
Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:
Compression
Avro Support
Avro (requires Level)
Parquet Support
JSON
UNCOMPRESSED
✅
✅
✅
SNAPPY
✅
✅
Please note that not all compression libraries are bundled with the GCP Storage connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
Authentication
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration. This is configured, as its name suggests, by default.
When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here's an example configuration for the "Credentials" mode:
And here is an example configuration using the "File" mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers here. This approach ensures robust security practices while handling access credentials.
The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.
By default, the prefix for these index objects is named .indexes for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this .indexes prefix.
You can configure the root prefix for these index objects using the property connect.gcpstorage.indexes.name. This property specifies the path from the root of the GCS bucket. Note that even if you configure this property, the connector will still create a nested prefix within the specified prefix.
Examples
Index Name (connect.gcpstorage.indexes.name)
Resulting Indexes Prefix Structure
Description
.indexes (default)
.indexes/<connector_name>/
The default setup, where each connector uses its own nested prefix within .indexes.
custom-indexes
custom-indexes/<connector_name>/
Custom prefix custom-indexes, with a nested prefix for each connector.
indexes/gcs-connector-logs
indexes/gcs-connector-logs/<connector_name>/
Uses a custom nested prefix gcs-connector-logs within indexes, with a nested prefix for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes, with a nested prefix for each connector.
Option Reference
Name
Description
Type
Available Values
Default Value
connect.gcpstorage.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
"Credentials", "File" or "Default"
"Default"
connect.gcpstorage.gcp.credentials
For "auth.mode" credentials: GCP Authentication credentials string.
This example writes to a bucket called demo, partitioning by a field called ts, store as JSON.
KCQL Support
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The connector uses KCQL to map topics to S3 buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .:
In this case, you can use the following KCQL statement:
Target Bucket and Path
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
SQL Projection
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
Source Topic
To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:
In this case the topic name will be appended to the $target destination.
Partitioning & Object Keys
The object key serves as the filename used to store data in S3. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $bucket/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY clause. The format is either $bucket/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (AWS Athena naming style mimicking Hive-like data partitioning) or $bucket/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of AWS S3 key length restrictions.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.
The partition clause works on header, key and values fields of the Kafka message.
To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the S3 object key name:
Kafka message headers can also be used in the S3 object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure S3 object keys effectively.
To enable Athena-like partitioning, use the following syntax:
Rolling Windows
Storing data in Amazon S3 and partitioning it by time is a common practice in data management. For instance, you may want to organize your S3 data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
Data Storage Format
While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within AWS S3. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in S3. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Not supported with a custom partition strategy.
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in S3 is desired.
Examples
Storing the message Value Avro data as Parquet in S3:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in S3.
Enabling the full message stored as JSON in S3:
Enabling the full message stored as AVRO in S3:
If the restore (see the S3 Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
Flush Options
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the flush.count, flush.size, and flush.interval properties. The settings are optional and if not specified the defaults are:
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3_600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
When connect.s3.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:
Flushing By Interval
The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to S3). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.
sink commit.png
Properties
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
Name
Description
Type
Available Values
Default Value
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
The sink connector optimizes performance by padding the output objects. This proves beneficial when using the S3 Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the S3 Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.
Compression
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.
Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:
Compression
Avro Support
Avro (requires Level)
Parquet Support
JSON
UNCOMPRESSED
✅
✅
✅
SNAPPY
✅
✅
Please note that not all compression libraries are bundled with the S3 connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
Authentication
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
Here’s an example configuration for the Credentials mode:
For enhanced security and flexibility when using the Credentials mode, it is highly advisable to utilize Connect Secret Providers.
The connector can also be used against API compatible systems provided they implement the following:
Indexes Directory
The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.
By default, the index objects are grouped within a prefix named .indexes for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this .indexes prefix.
You can configure the prefix for these index objects using the property connect.s3.indexes.name. This property specifies the path from the root of the S3 bucket. Note that even if you configure this property, the connector will still place the indexes within a nested prefix of the specified prefix.
Examples
Index Name (connect.s3.indexes.name)
Resulting Indexes Prefix Structure
Description
.indexes (default)
.indexes/<connector_name>/
The default setup, where each connector uses its own subdirectory within .indexes.
custom-indexes
custom-indexes/<connector_name>/
Custom root directory custom-indexes, with a subdirectory for each connector.
indexes/s3-connector-logs
indexes/s3-connector-logs/<connector_name>/
Uses a custom subdirectory s3-connector-logs within indexes, with a subdirectory for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes, with a subdirectory for each connector.
Option Reference
Name
Description
Type
Available Values
Default Value
connect.s3.aws.auth.mode
Specifies the AWS authentication mode for connecting to S3.
INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]
{
...
"a.b": "value",
...
}
INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
topics.regex = ^sensor_data_\d+$
connect.gcpstorage.kcql= INSERT INTO $target SELECT * FROM `*` ....
{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...
pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.ts STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demo
INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]
{
...
"a.b": "value",
...
}
INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testbucket SELECT * FROM topicA;
INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
topics.regex = ^sensor_data_\d+$
connect.s3.kcql= INSERT INTO $target SELECT * FROM `*` ....
{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}
...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...
pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)
Sets the compression level when compression is enabled for data transfer to GCP Storage.
int
1-9
(Empty)
connect.gcpstorage.seek.max.files
Specifies the maximum threshold for the number of files the connector uses.
int
5
connect.gcpstorage.indexes.name
Configure the indexes prefix for this connector.
string
".indexes"
connect.gcpstorage.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.gcpstorage.schema.change.detector
Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).
string
default, version, compatibility
default
connect.gcpstorage.skip.null.values
Skip records with null values (a.k.a. tombstone records).
When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.
boolean
true, false
false
padding.length.partition
Sets the padding length for the partition.
Int
0
padding.length.offset
Sets the padding length for the offset.
Int
12
partition.include.keys
Specifies whether partition keys are included.
Boolean
false
Default (Custom Partitioning): true
store.envelope
Indicates whether to store the entire Kafka message
Boolean
store.envelope.fields.key
Indicates whether to store the envelope’s key.
Boolean
store.envelope.fields.headers
Indicates whether to store the envelope’s headers.
Boolean
store.envelope.fields.value
Indicates whether to store the envelope’s value.
Boolean
store.envelope.fields.metadata
Indicates whether to store the envelope’s metadata.
Boolean
flush.size
Specifies the size (in bytes) for the flush operation.
Long
500000000 (500MB)
flush.count
Specifies the number of records for the flush operation.
Int
50000
flush.interval
Specifies the interval (in seconds) for the flush operation
Long
3600(1h)
key.suffix
When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added
String
<empty>
GZIP
✅
✅
LZ0
✅
LZ4
✅
BROTLI
✅
BZIP2
✅
ZSTD
✅
⚙️
✅
DEFLATE
✅
⚙️
XZ
✅
⚙️
connect.s3.aws.secret.key
The AWS Secret Key used for authentication.
string
(Empty)
connect.s3.aws.region
The AWS Region where the S3 bucket is located.
string
(Empty)
connect.s3.pool.max.connections
Specifies the maximum number of connections allowed in the AWS Client’s HTTP connection pool when interacting with S3.
int
-1 (undefined)
50
connect.s3.custom.endpoint
Allows for the specification of a custom S3 endpoint URL if needed.
string
(Empty)
[Deprecated]
connect.s3.vhost.bucket
Enables the use of Vhost Buckets for S3 connections. Always set to true when custom endpoints are used.
Deprecation:
This setting maps directly to the AWS SDK’s pathStyleAccessEnabled() method. However, these two concepts are semantically opposite.
Use
available since version 11.3.0.
connect.s3.path.style.access
boolean
true, false
false
connect.s3.path.style.access
When set to
true it enables path-style access (matches AWS SDK semantics). When set to
false it enables virtual-hosted style access.
boolena
true, false
connect.s3.error.policy
connect.s3.path.style.accessDefines the error handling policy when errors occur during data transfer to or from S3.
string
“NOOP,” “THROW,” “RETRY”
“THROW”
connect.s3.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.s3.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.s3.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with S3.
long
5
connect.s3.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.s3.local.tmp.directory
Enables the use of a local folder as a staging area for data transfer operations.
string
(Empty)
connect.s3.kcql
A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.
string
(Empty)
connect.s3.compression.codec
Sets the Parquet compression codec to be used when writing data to S3.
Sets the compression level when compression is enabled for data transfer to S3.
int
1-9
(Empty)
connect.s3.seek.max.files
Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.
int
5
connect.s3.indexes.name
Configure the indexes prefix for this connector.
string
".indexes"
connect.s3.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.s3.schema.change.detector
Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).
string
default, version, compatibility
default
connect.s3.skip.null.values
Skip records with null values (a.k.a. tombstone records).
boolean
true, false
false
connect.s3.latest.schema.optimization.enabled
When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.
boolean
true,false
false
Stream Reactor
This page contains the release notes for the Stream Reactor.
When String and Enum are used as a field type, connectors may fail to transfer data to the target. The enhancement adds better Avro schema support, preserving package information and Enums during conversions between Avro and Connect schemas.
When v3 records arrived before v2 records, an ArrayIndexOutOfBoundsException occurred. This happened because ParquetWriter or AvroWriter attempted to write GenericRecords with mismatched Avro schema objects.
With at-least-once semantics enabled, the sink connector uploads the file directly to the destination. This contrasts with the three operations needed for exactly-once delivery. The result is improved latency and reduced cloud costs.
Fix: GCP Pub/Sub Source
Previously, each subscription was limited to a single task, restricting scalability. This update now allows subscriptions to be distributed across all tasks in the connector, enhancing capacity and performance.
11.3.0
Fix: S3 path vs virtual hosted URL style
This release addresses a semantic inconsistency in how S3 access styles were configured, specifically affecting users of S3-compatible storage solutions (e.g., MinIO, s3proxy) or specific AWS regions requiring path-style access.
The Issue
Previously, the configuration property connect.s3.vhost.bucket was mapped directly to the AWS SDK’s pathStyleAccessEnabled() method. However, these two concepts are semantically opposite:
Because the connector passed the boolean value directly without inversion, users were forced to use a counter-intuitive configuration (setting vhost.bucket=true to achieve path-style access) to support storage systems that do not support virtual hosting.
The Fix
We have introduced a new, unambiguous configuration property: connect.s3.path.style.access.
To maintain backward compatibility for existing deployments, the logic now prioritizes the new property while preserving the legacy behavior for the old property.
Configuration Priority Logic:
If connect.s3.path.style.access is set:
The connector uses this value.
connect.s3.vhost.bucket is ignored.
Migration Guide
DataLake sources late handling of data
In scenarios where users cannot ensure files are uploaded to cloud storage sequentially or by timestamp, late-arriving files—those older than the current watermark—are often skipped and never processed. Previously, the connector would only process files newer than this watermark. This update introduces the post.process.action.watermark.process.late.arrival KCQL property, enabling files uploaded out of order to be processed when the Move post-process action is used.
This approach periodically scans the data lake for files with timestamps older than the connector's watermark. It then copies them to themselves to update their last modified timestamp, which may incur additional costs. Users are encouraged to ensure their systems can produce files in sequence to avoid these costs and potential order discrepancies, as out-of-order processing affects record sequencing.
The scan interval is defined at the connector level:
For AWS S3 is: connect.s3.source.late.arrival.interval
For GCS is: connect.gcpstorage.source.late.arrival.interval
DataLake sink control the object key namer
Custom File Namer Factory
A custom FileNamerFactory can be created with an optional configuration parameter. The FileNamerFactory includes a method to generate a FileNamer based on existing system parameters for file naming.
The KCQL properties for data lake sinks now support setting store.file.namer to a specific class. Users must currently provide their own class and implementation, with potential enhancements anticipated in future updates.
11.2.0
Azure Data Lake Gen2 Sink Update
The patch release addresses several issues discovered in the Azure Data Lake Storage (ADLS) Gen2 sink:
412 ConditionNotMet Errors: Occurred during lock file updates.
Empty File Conflicts: Resulted from recursive directory marker creation.
Unexpected Whitespace or EOF Errors: Caused by an incorrect state, leading to the lock file's eTag being returned as the lock file's content.
11.1.0
DataLakes Sinks
This release resolves a gap in the S3, GCS, and Azure sinks when latest schema optimization is enabled (via connect.***.latest.schema.optimization.enabled). Previously, connectors would fail if an Avro field changed from an Enum to a Union of an Enum and String. This update fixes that problem.
11.0.0
This release upgrades the connectors to Apache Kafka 4.1. All connectors have been tested and verified against Kafka versions 4.0 and 4.1.
Compatibility Notice
Important: Kafka 3.x versions are no longer supported or tested with this release. While the connectors may function with Kafka 3.x environments, they are not recommended for production use without thorough testing and validation in your specific environment.
Organizations currently running Kafka 3.x should plan to upgrade their Kafka infrastructure to version 4.0 or later before deploying these connectors in production.
Connector Retirement
Effective with version 11.0.0, the following sink connectors have been retired and are no longer available:
Redis Sink Connector
InfluxDB Sink Connector
These connectors will not receive further updates, bug fixes, or security patches. Organizations currently using these connectors should evaluate alternative solutions and plan migration strategies accordingly.
For questions regarding migration paths or alternative connector options, please consult the product documentation or contact support.
10.0.3
Datalakes sinks (AWS S3, GCS and Azure DataLake Gen2)
The sink addresses a gap by committing the message offset as-is, rather than incrementing it by one. This behavior causes Kafka's underlying consumer to appear as if it's always one message behind each partition. This isn't an issue under exactly-once semantics, as the state store in the data lake ensures message processing integrity, ignoring any already processed messages. Under at-least-once semantics, replaying a message is considered acceptable.
10.0.2
Azure DataLake Gen 2
This patch release addresses two issues on the Azure DataLake sink
Fixed errors caused by malformed HTTP headers in ADLS Gen2 requests, such as:
Fixed failures when writing to nested directories in ADLS Gen2 with Hierarchical Namespace (HNS) enabled.
Thank you to and for their fixes.
10.0.1
DataLakes
This patch addresses several critical bugs and regressions introduced in version 9.0.0, affecting the Datalake connectors for S3, GCS, and Azure.
Connector Restart Logic: Fixed an issue that caused an error on restart if pending operations from a previous session had not completed.
S3 Connector: Corrected a bug where delete object requests incorrectly sent an empty versionId. This resolves API exceptions when using S3-compatible storage solutions, such as Pure Storage.
Addressed a regression where the connector name was omitted from the lock path for exactly-once processing. This fix prevents conflicts when multiple connectors read from the same Kafka topics and write to the same destination bucket.
10.0.0
New Apache Cassandra sink
This new connector is compatible with the any Apache Cassandra compatible databases from version 3+. The previous connector is deprecated and will be removed in a future release
Azure CosmosDB sink
Version 10.0.0 introduces breaking changes: the connector has been renamed from DocumentDB, uses the official CosmosDB SDK, and supports new bulk and key strategies.
The CosmosDB Sink Connector is a Kafka Connect sink connector designed to write data from Kafka topics into Azure CosmosDB. It was formerly known as the DocumentDB Sink Connector and has been fully renamed and refactored with the following key changes:
Renamed from DocumentDB to CosmosDB across all code and documentation.
Updated package names to reflect the new CosmosDB naming.
Replaced the legacy DocumentDB client with the official CosmosDB Java SDK.
New Features and Improvements
In non-envelope mode, DataLakes connector sinks now skip tombstone records, preventing connector failures.
HTTP Sink: Enable Import of All Headers in the Message Template
9.0.2
DataLake Sinks (S3, GCP Storage, Azure Data Lake)
The previous commit that addressed the Exactly Once semantics fix inadvertently omitted support for the connect.xxxxxxx.indexes.name configuration. As a result, custom index filenames were not being applied as expected.
This commit restores that functionality by ensuring the connect.xxxxxxx.indexes.name setting is properly respected during index file generation.
9.0.1
Google BigQuery Sink
The latest release of Stream Reactor includes a fork of Apache 2's Confluent BigQuery, originally sourced from WePay.
9.0.0
All Modules
Dependency upgrades.
Project now builds for Kafka 3.9.1.
Exclude Kafka dependencies and certain other things (eg log frameworks) from the final jar.
DataLake Sinks (S3, GCP Storage, Azure Data Lake)
Improved Exactly Once Semantics by introducing a per-task lock mechanism to prevent duplicate writes when Kafka Connect inadvertently runs multiple tasks for the same topic-partition.
Each task now creates a lock file per topic-partition (e.g., lockfile_topic_partition.yml) upon startup.
The lock file is used to ensure only one task (the most recent) can write to the target bucket, based on object eTag validation.
MQTT Connector
Refactored MqttWriter to improve payload handling and support dynamic topics, with JSON conversion for structured data and removal of legacy converters and projections (#232).
Leading Slashes Fixes:
Leading slash from MQTT topic should be removed when converting to Kafka topic target.
When String and Enum are used as a field type, connectors may fail to transfer data to the target. The enhancement adds better Avro schema support, preserving package information and Enums during conversions between Avro and Connect schemas.
When v3 records arrived before v2 records, an ArrayIndexOutOfBoundsException occurred. This happened because ParquetWriter or AvroWriter attempted to write GenericRecords with mismatched Avro schema objects.
8.1.33
DataLake Sinks (S3, GCP Storage, Azure Data Lake)
This update introduces an optimization that reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.
How It Works
When the input message schema changes in a backward-compatible way, the sink tracks the latest schema and continues using it for serialization. This means that incoming records with an older schema won't automatically trigger a flush, as long as the older schema is compatible with the current one.
Example Flush Behavior
Consider the following sequence of messages and their associated schemas:
Reduces log entries, by moving the log entry confirming the object upload from INFO to DEBUG
8.1.31
DataLake Sinks
This release introduces improvements to Avro error handling, providing better diagnostics and insights into failures during data writing.
8.1.30
DataLake Sinks (S3, GCP Storage, Azure Data Lake)
Resolved dependency issues causing sink failures with Confluent 7.6.0. Confluent 7.6.0 introduced new Schema Registry
rule modules that were force-loaded, even when unused, leading to the following error:
This update ensures compatibility by adjusting dependencies, preventing unexpected failures in affected sinks.
HTTP Sink
Adjusting the following log line from INFO level to TRACE level
8.1.29
DataLake Sinks (S3, GCP Storage, Azure Data Lake)
Support has been added for writing to the same bucket from two different connectors located in different Kafka clusters, with both reading from the same topic name. To differentiate the object keys generated and prevent data loss, a new KCQL syntax has been introduced:\
HTTP Sink
This feature offers fixed interval retries in addition to the Exponential option. To enable it, use the following configuration settings
A bug affecting the P99, P90, and P50 metrics has been resolved. Previously, failed HTTP requests were not included in the metrics calculation.
8.1.28
DataLake Sinks (S3, GCP Storage, Azure Data Lake)
Enable Skipping of Null Records
Added the following configuration property to prevent possible NullPointerException situations in the S3 Sink, Azure Datalake Sink, and GCP Storage Sink connectors:
When set to true, the sinks will skip null value records instead of processing them. Defaults to false.
If you expect null or tombstone records and are not using envelope mode, enabling this setting is recommended to avoid errors.
HTTP Sink
Fix MBean Registration Issue
Fixed an issue where MBean registration failed when multiple tasks were used, causing exceptions due to duplicate instances. The MBean name now includes the task ID to ensure uniqueness, and MBeans are properly unregistered when tasks stop.
8.1.27
Reverted: Change to LastModified Ordering with Post-Process Actions (introduced in 8.1.23)
We have reverted the change that avoided filtering to the latest result when ordering by LastModified with a post-process action. The original change aimed to prevent inconsistencies due to varying LastModified timestamps.
However, this introduced a potential issue due to Kafka Connect’s commitRecord method being called asynchronously, meaning files may not be cleaned up before the next record is read. This could result in records being processed multiple times. The previous behaviour has been restored to ensure correctness.
8.1.26
Azure Service Bus source
Source watermark is not stored anymore since it is not used when the task restarts
GCP PubSub source
A fix was made to enable attaching the headers to the resulting Kafka message.
8.1.25
All Modules
Align Confluent lib dependency with Kafka 3.8.
AWS S3 Source
Customers have noticed that when the source deletes files on S3, then the entire path is deleted. Whilst this is actually due to the way S3 works, we can actually do something about this in the connector.
This adds a new boolean KCQL property to the S3 source: post.process.action.retain.dirs (default value: false)
If this is set to true, then upon moving/deleting files within the source post-processing, then first a zero-byte object will be created to ensure that the path will still be represented on S3.
8.1.24
HTTP
JMX Metrics Support
The HTTP Sink now publishes counts for success, request average among other metrics to JMX.
Extractor Fix
A bug was reported that referencing the entire record value yields an exception when the payload is a complex type. In this case a HashMap.
The changes allow the return of the value when the extractor has no field to extract (ie. extract the full value)
This PR revamps the mechanisms governing schema change rollover, as they can lead to errors. New functionality is introduced to ensure improved compatibility.
Removed Hadoop Shaded Dependency
The hadoop-shaded-protobuf dependency has been removed as it was surplus to requirements and pulling in an old version of protobuf-java which introduced vulnerabilities to the project.
Removed Properties for Schema Change Rollover:
The properties $connectorPrefix.schema.change.rollover have been removed for the following connectors: connect.s3, connect.datalake, and connect.gcpstorage. This change eliminates potential errors and simplifies the schema change handling logic.
New Property for Schema Change Detection:
The property $connectorPrefix.schema.change.detector has been introduced for the following connectors: connect.s3, connect.datalake, and connect.gcpstorage.
This property allows users to configure the schema change detection behavior with the following possible values: • default: Schemas are compared using object equality. • version: Schemas are compared by their version field. • compatibility: A more advanced mode that ensures schemas are compatible using Avro compatibility features.
Packaging Changes
The META-INF/maven directory is now built by the assembly process. This will ensure it reflects what is in the jar and that this directory is not just built from our component jars. This mirrors an approach we took for the secret-provider.
8.1.23
DataLakes (S3, GCP) source fixes
Polling Backoff
The connector incurs high costs when there is no data available in the buckets because it continuously polls the data lake in a tight loop, as controlled by Kafka Connect.
From this version by default a backoff queue is used, introducing a standard method for backing off calls to the underlying cloud platform.
Avoid filtering by lastSeenFile where a post process action is configured
When ordering by LastModified and a post-process action is configured, avoid filtering to the latest result.
This change avoids bugs caused by inconsistent LastModified dates used for sorting. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.
Add a flag to populate kafka headers with the watermark partition/offset
This adds a connector property for GCP Storage and S3 Sources: connect.s3.source.write.watermark.headerconnect.gcpstorage.source.write.watermark.header
If set to true then the headers in the source record produced will include details of the source and line number of the file.
If set to false (the default) then the headers won't be set.
Currently this does not apply when using the envelope mode.
8.1.22
DataLakes (S3, GCP) source fixes
This release addresses two critical issues:
Corrupted connector state when DELETE/MOVE is used: The connector is designed to store the last processed document and its location within its state for every message sent to Kafka. This mechanism ensures that the connector can resume processing from the correct point in case of a restart. However, when the connector is configured with a post-operation to move or delete processed objects within the data lake, it stores the last processed object in its state. If the connector restarts and the referenced object has been moved or deleted externally, the state points to a non-existent object, causing the connector to fail. The current workaround requires manually cleaning the state and restarting the connector, which is inefficient and error-prone.
Incorrect Handling of Move Location Prefixes: When configuring the move location within the data lake, if the prefix ends with a forward slash (/), it results in malformed keys like a//b. Such incorrect paths can break compatibility with query engines like Athena, which may not handle double slashes properly.
8.1.21
Azure Service Bus source
Performance improvements in the source to handle a higher throughput. The code now leverages prefetch count, and disables the auto complete. The following connector configs were added
connect.servicebus.source.prefetch.count The number of messages to prefetch from ServiceBus
connect.servicebus.source.complete.retries.max The maximum number of retries to attempt while completing a message
A NullPointerException is thrown and the sinks lose the stacktrace. It is a patch to enhance the logging
8.1.19
Prevent ElasticSearch from Skipping Records After Tombstone
Addresses a critical bug in ElasticSearch versions 6 (ES6) and 7 (ES7) where records following a tombstone are inadvertently skipped during the insertion process. The issue stemmed from an erroneous return statement that halted the processing of subsequent records.
8.1.18
🚀 New Features
Sequential Message Sending for Azure Service Bus
Introduced a new KCQL property: batch.enabled (default: true).
Users can now disable batching to send messages sequentially, addressing specific scenarios with large message sizes (e.g., >1 MB).
Why this matters: Batching improves performance but can fail for large messages. Sequential sending ensures reliability in such cases.
Post-Processing for Datalake Cloud Source Connectors
Added post-processing capabilities for AWS S3 and GCP Storage source connectors ( Azure Datalake Gen 2 support coming soon).
New KCQL properties:
post.process.action: Defines the action (DELETE or MOVE
Example 2: Move files to an archive bucket:
🛠 Dependency Updates
Updated Azure Service Bus Dependencies
azure-core updated to version 1.54.1.
azure-messaging-servicebus updated to version 7.17.6.
These updates ensure compatibility with the latest Azure SDKs and improve stability and performance.
Upgrade Notes
Review the new KCQL properties and configurations for Azure Service Bus and Datalake connectors.
Ensure compatibility with the updated Azure Service Bus dependencies if you use custom extensions or integrations.
Thank you to all contributors! 🎉
8.1.17
Improves ElasticSearch sinks by allowing the message key fields, or message headers to be used as part of the document primary key. Reference _key[.key_field_path or _header.header_name to set the document primary key
Fixes the data lake sinks error message on flush.interval
8.1.16
Improvements to the HTTP Sink
Queue Limiting: We've set a limit on the queue size per topic to reduce the chances of an Out-of-Memory (OOM) issue. Previously the queue was unbounded and in a scenario where the http calls are slow and the sink gets more records than it clears, it would eventually lead to OOM.
Offering Timeout: The offering to the queue now includes a timeout. If there are records to be offered, but the timeout is exceeded, a retriable exception is thrown. Depending on the connector's retry settings, the operation will be attempted again. This helps avoid situations where the sink gets stuck processing a slow or unresponsive batch.
Duplicate Record Handling: To prevent the same records from being added to the queue multiple times, we've introduced a
8.1.15
HTTP sink ignores null records. Avoids a NPE if custom SMTs send null records to the pipeline
8.1.14
Improvements to the HTTP Sink reporter
8.1.13
8.1.12
Datalake sinks allow an optimisation configuration to avoid rolling the file on schema change. To be used only when the schema changes are backwards compatible
Fixes for the HTTP sink batch.
8.1.11
HTTP sink improvements
HTTP sink improvements
8.1.10
Dependency version upgrades.
Azure Service Bus Source
Fixed timestamp of the messages and some of the fields in the payload to correctly use UTC millis.
8.1.9
Azure Service Bus Source
Changed contentType in the message to be nullable (Optional String).
8.1.8
HTTP Sink
Possible exception fix
Store success/failure response codes in http for the reporter.
8.1.7
HTTP Sink
Fixes possible exception in Reporter code.
8.1.6
HTTP Sink
Fixes and overall stability improvements.
8.1.4
Google Cloud Platform Storage
Remove overriding transport options on GCS client.
8.1.3
HTTP Sink
Bugfix: HTTP Kafka Reporter ClientID is not unique.
8.1.2
HTTP Sink & GCS Sink
Improvements for handing HTTP sink null values & fix for NPE on GCS.
8.1.1
Removal of shading for Avro and Confluent jars.
8.1.0
HTTP Sink
Addition of HTTP reporter functionally to route HTTP success and errors to a configurable topic.
8.0.0
HTTP Sink
Breaking Configuration Change
Configuration for the HTTP Sink changes from Json to standard Kafka Connect properties. Please study the documentation if upgrading to 8.0.0.
Removed support for JSON configuration format. Instead please now configure your HTTP Sink connector using kafka connect properties. .
Introduced SSL Configuration. .
Introduced OAuth Support. .
7.4.5
ElasticSearch (6 and 7)
Support for dynamic index names in KCQL.
Configurable tombstone behaviour using KCQL property. behavior.on.null.values
SSL Support using standard Kafka Connect properties.
Http Sink
Adjust defaultUploadSyncPeriod to make connector more performant by default.
Data Lake Sinks (AWS, Azure Datalake and GCP Storage)
Fix for issue causing sink to fail with NullPointerException due to invalid offset.
7.4.4
Azure Datalake & GCP Storage
Dependency version upgrades
Data Lake Sinks (AWS, Azure Datalake and GCP Storage)
Fixes a gap in the avro/parquet storage where enums where converted from Connect enums to string.
Adds support for explicit "no partition" specification to kcql, to enable topics to be written in the bucket and prefix without partitioning the data.
Syntax Example:INSERT INTO foo SELECT * FROM bar NOPARTITION
7.4.3
All Connectors
Dependency version upgrades
Data Lake Sinks (AWS, Azure Datalake and GCP Storage)
This release introduces a new configuration option for three Kafka Connect Sink Connectors—S3, Data Lake, and GCP Storage—allowing users to disable exactly-once semantics. By default, exactly once is enabled, but with this update, users can choose to disable it, opting instead for Kafka Connect’s native at-least-once offset management.
S3 Sink Connector:connect.s3.exactly.once.enableData Lake Sink Connector:connect.datalake.exactly.once.enableGCP Storage Sink Connector:connect.gcpstorage.exactly.once.enable
Default Value: true
Indexing is enabled by default to maintain exactly-once semantics. This involves creating an .indexes directory at the root of your storage bucket, with subdirectories dedicated to tracking offsets, ensuring that records are not duplicated.
Users can now disable indexing by setting the relevant property to false. When disabled, the connector will utilise Kafka Connect’s built-in offset management, which provides at-least-once semantics instead of exactly-once.
7.4.2
All Connectors
Dependency version upgrades
AWS S3 Source and GCP Storage Source Connectors
File Extension Filtering
Introduced new properties to allow users to filter the files to be processed based on their extensions.
For AWS S3 Source Connector:
connect.s3.source.extension.excludes: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
connect.s3.source.extension.includes: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
For GCP Storage Source Connector:
connect.gcpstorage.source.extension.excludes: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
connect.gcpstorage.source.extension.includes: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
These properties provide more control over the files that the AWS S3 Source Connector and GCP Storage Source Connector process, improving efficiency and flexibility.
GCP Storage Source and Sink Connectors
Increases the default HTTP Retry timeout from 250ms total timeout to 3 minutes as default. The default consumer group max.poll.timeout is 5 minutes, so it’s within the boundaries to avoid a group rebalance.
7.4.1
GCP Storage Connector
Adding retry delay multiplier as a configurable parameter (with default value) to Google Cloud Storage Connector. Main changes revolve around RetryConfig class and its translation to gax HTTP client config.
Data Lake Sinks (AWS, Azure Datalake and GCP Storage)
Making indexes directory configurable for both source and sink.
Sinks
Use the below properties to customise the indexes root directory:
connect.datalake.indexes.name
connect.gcpstorage.indexes.name
connect.s3.indexes.name
See connector documentation for more information.
Sources:
Use the below properties to exclude custom directories from being considered by the source.
Data Lake Sinks (AWS, Azure Datalake and GCP Storage)
Support added for writing Date, Time and Timestamp data types to AVRO.
7.3.1
GCP Storage Connector
Invalid Protocol Configuration Fix
7.3.0
NEW: Azure Service Bus Source Connector
Azure Service Bus Source Connector
NEW: GCP Pub/Sub Source Connector
GCP Pub/Sub Source Connector
Data Lake Sinks (AWS, Azure Datalake and GCP Storage)
To back the topics up the KCQL statement is
When * is used the envelope setting is ignored.
This change allows for the * to be taken into account as a default if the given message topic is not found.
HTTP Sink
Bug fix to ensure that, if specified as part of the template, the Content-Type header is correctly populated.
All Connectors:
Update of dependencies.
Upgrading from any version prior to 7.0.0, please see the release and upgrade notes for 7.0.0.
7.2.0
Enhancements
Automated Skip for Archived Objects:
The S3 source now seamlessly bypasses archived objects, including those stored in Glacier and Deep Archive. This enhancement improves efficiency by automatically excluding archived data from processing, avoiding the connector crashing otherwise
Enhanced Key Storage in Envelope Mode:
Changes have been implemented to the stored key when using envelope mode. These modifications lay the groundwork for future functionality, enabling seamless replay of Kafka data stored in data lakes (S3, GCS, Azure Data Lake) from any specified point in time.
Full Changelog:
7.1.0
Source Line-Start-End Functionality Enhancements
We’ve rolled out enhancements to tackle a common challenge faced by users of the S3 source functionality. Previously, when an external producer abruptly terminated a file without marking the end message, data loss occurred.
To address this, we’ve introduced a new feature: a property entry for KCQL to signal the handling of unterminated messages. Meet the latest addition, read.text.last.end.line.missing. When set to true, this property ensures that in-flight data is still recognized as a message even when EOF is reached but the end line marker is missing.
Upgrading from any version prior to 7.0.0, please see the release and upgrade notes for 7.0.0.
7.0.0
Data-lakes Sink Connectors
This release brings substantial enhancements to the data-lakes sink connectors, elevating their functionality and flexibility. The focal point of these changes is the adoption of the new KCQL syntax, designed to improve usability and resolve limitations inherent in the previous syntax.
Key Changes
New KCQL Syntax: The data-lakes sink connectors now embrace the new KCQL syntax, offering users enhanced capabilities while addressing previous syntax constraints.
Data Lakes Sink Partition Name: This update ensures accurate preservation of partition names by avoiding the scraping of characters like \ and /. Consequently, SMTs can provide partition names as expected, leading to reduced configuration overhead and increased conciseness.
KCQL Keywords Replaced
Several keywords have been replaced with entries in the PROPERTIES section for improved clarity and consistency:
WITHPARTITIONER: Replaced by PROPERTIES ('partition.include.keys'=true/false). When WITHPARTITIONER KeysAndValue is set to true, the partition keys are included in the partition path. Otherwise, only the partition values are included.
WITH_FLUSH_SIZE: Replaced by PROPERTIES ('flush.size'=$VALUE).
Benefits
The adoption of the new KCQL syntax enhances the flexibility of the data-lakes sink connectors, empowering users to tailor configurations more precisely to their requirements. By transitioning keywords to entries in the PROPERTIES section, potential misconfigurations stemming from keyword order discrepancies are mitigated, ensuring configurations are applied as intended.
Upgrades
Please note that the upgrades to the data-lakes sink connectors are not backward compatible with existing configurations. Users are required to update their configurations to align with the new KCQL syntax and PROPERTIES entries. This upgrade is necessary for any instances of the sink connector (S3, Azure, GCP) set up before version 7.0.0.
To upgrade to the new version, users must follow these steps:
Stop the sink connectors.
Update the connector to version 7.0.0.
Edit the sink connectors’ KCQL setting to translate to the new syntax and PROPERTIES entries.
6.3.1
This update specifically affects datalake sinks employing the JSON storage format. It serves as a remedy for users who have resorted to a less-than-ideal workaround: employing a Single Message Transform (SMT) to return a Plain Old Java Object (POJO) to the sink. In such cases, instead of utilizing the Connect JsonConverter to seamlessly translate the payload to JSON, reliance is placed solely on Jackson.
However, it’s crucial to note that this adjustment is not indicative of a broader direction for future expansions. This is because relying on such SMT practices does not ensure an agnostic solution for storage formats (such as Avro, Parquet, or JSON).
6.3.0
NEW: HTTP Sink Connector
HTTP Sink Connector (Beta)
NEW: Azure Event Hubs Source Connector
Azure Event Hubs Source Connector.
All Connectors:
Update of dependencies, CVEs addressed.
Please note that the Elasticsearch 6 connector will be deprecated in the next major release.
6.2.0
NEW: GCP Storage Source Connector
GCP Storage Source Connector (Beta)
AWS Source Connector
Important: The AWS Source Partition Search properties have changed for consistency of configuration. The properties that have changed for 6.2.0 are:
connect.s3.partition.search.interval changes to connect.s3.source.partition.search.interval.
JMS Source and Sink Connector
Jakarta Dependency Migration: Switch to Jakarta EE dependencies in line with industry standards to ensure evolution under the Eclipse Foundation.
All Connectors:
There has been some small tidy up of dependencies, restructuring and removal of unused code, and a number of connectors have a slimmer file size without losing any functionality.
The configuration directory has been removed as these examples are not kept up-to-date. Please see the connector documentation instead.
Dependency upgrades.
6.1.0
All Connectors:
In this release, all connectors have been updated to address an issue related to conflicting Antlr jars that may arise in specific environments.
AWS S3 Source:
Byte Array Support: Resolved an issue where storing the Key/Value as an array of bytes caused compatibility problems due to the connector returning java.nio.ByteBuffer while the Connect framework’s ByteArrayConverter only works with byte[]. This update ensures seamless conversion to byte[] if the key/value is a ByteBuffer.
JMS Sink:
Fix for NullPointerException: Addressed an issue where the JMS sink connector encountered a NullPointerException when processing a message with a null JMSReplyTo header value.
JMS Source:
Fix for DataException: Resolved an issue where the JMS source connector encountered a DataException when processing a message with a JMSReplyTo header set to a queue.
Removed check preventing nested paths being used in the sink.
Avoid cast exception in GCP Storage connector when using Credentials mode.
6.0.0
New Connectors introduced in 6.0.0:
Azure Datalake Sink Connector (Beta)
GCP Storage Sink Connector (Beta)
Deprecated Connectors removed in 6.0.0:
Kudu
Hazelcast
Hbase
Hive
All Connectors:
Standardising package names. Connector class names and converters will need to be renamed in configuration.
Some clean up of unused dependencies.
Introducing cloud-common module to share code between cloud connectors.
5.0.1
New Connectors introduced in 5.0.1:
Consumer Group Offsets S3 Sink Connector
AWS S3 Connector - S3 Source & Sink
Enhancement: BigDecimal Support
Redis Sink Connector
Bug fix: Redis does not initialise the ErrorHandler
5.0.0
All Connectors
Test Fixes and E2E Test Clean-up: Improved testing with bug fixes and end-to-end test clean-up.
Code Optimization: Removed unused code and converted Java code and tests to Scala for enhanced stability and maintainability.
Ascii Art Loading Fix: Resolved issues related to ASCII art loading.
AWS S3 Connector - S3 Source & Sink
The source and sink has been the focus of this release.
Full message backup. The S3 sink and source now supports full message backup. This is enabled by adding in the KCQL PROPERTIES('store.envelope'=true)
Removed Bytes_*** storage format. For those users leveraging them there is a migration information below. Storing raw Kafka message the storage format should be AVRO/PARQUET/JSON(less ideal).
Introduced support for BYTES storing single message as raw binary. Typically, storing images or videos are the use case for this. This is enabled by adding in the KCQL STOREAS BYTES
Sink
Enhanced PARTITIONBY Support: expanded support for PARTITIONBY fields, now accommodating fields containing dots. For instance, you can use PARTITIONBY a, `field1.field2` for enhanced partitioning control.
Advanced Padding Strategy: a more advanced padding strategy configuration. By default, padding is now enforced, significantly improving compatibility with S3 Source.
Improved Error Messaging: Enhancements have been made to error messaging, providing clearer guidance, especially in scenarios with misaligned topic configurations (#978).
Source
Expanded Text Reader Support: new text readers to enhance data processing flexibility, including:
Regex-Driven Text Reader: Allows parsing based on regular expressions.
Multi-Line Text Reader: Handles multi-line data.
Kudu and Hive
The Kudu and Hive connectors are now deprecated and will be removed in a future release.
InfluxDB
Fixed a memory issue with the InfluxDB writer.
Upgraded to Influxdb2 client (note: doesn’t yet support Influxdb2 connections).
S3 upgrade notes
Upgrading from 5.0.0 (preview) to 5.0.0
For installations that have been using the preview version of the S3 connector and are upgrading to the release, there are a few important considerations:
Previously, default padding was enabled for both “offset” and “partition” values starting in June.
However, in version 5.0, the decision to apply default padding to the “offset” value only, leaving the " partition" value without padding. This change was made to enhance compatibility with querying in Athena.
If you have been using a build from the master branch since June, your connectors might have been configured with a different default padding setting.
To maintain consistency and ensure your existing connector configuration remains valid, you will need to use KCQL configuration properties to customize the padding fields accordingly.
Upgrading from 4.x to 5.0.0
Starting with version 5.0.0, the following configuration keys have been replaced.
Field
Old Property
New Property
Upgrading from 4.1.* and 4.2.0
In version 4.1, padding options were available but were not enabled by default. At that time, the default padding length, if not specified, was set to 8 characters.
However, starting from version 5.0, padding is now enabled by default, and the default padding length has been increased to 12 characters.
Enabling padding has a notable advantage: it ensures that the files written are fully compatible with the Lenses Stream Reactor S3 Source, enhancing interoperability and data integration.
Sinks created with 4.2.0 and 4.2.1 should retain the padding behaviour, and, therefore should disable padding:
If padding was enabled in 4.1, then the padding length should be specified in the KCQL statement:
Upgrading from 4.x to 5.0.0 only when STOREAS Bytes_*** is used
The Bytes_*** storage format has been removed. If you are using this storage format, you will need to install the 5.0.0-deprecated connector and upgrade the connector instances by changing the class name:
Source Before:
Source After:
Sink Before:
Sink After:
The deprecated connector won’t be developed any further and will be removed in a future release. If you want to talk to us about a migration plan, please get in touch with us at .
Upgrade a connector configuration
To migrate to the new configuration, please follow the following steps:
stop all running instances of the S3 connector
upgrade the connector to 5.0.0
update the configuration to use the new properties
4.2.0
All
Ensure connector version is retained by connectors
Lenses branding ASCII art updates
4.1.0
Note
From Version 4.10, AWS S3 Connectors will use the AWS Client by default. You can revert to the jClouds version by setting the connect.s3.aws.client property.
All
Scala upgrade to 2.13.10
Dependency upgrades
4.0.0
All
Scala 2.13 Upgrade
Gradle to SBT Migration
3.0.1
All
Replace Log4j with Logback to overcome CVE-2021-44228
Bringing code from legacy dependencies inside of project
3.0.0
All
Move to KCQL 2.8.9
Change sys.errors to ConnectExceptions
2.1.3
Move to connect-common 2.0.5 that adds complex type support to KCQL
2.1.2
AWS S3 Sink Connector
Prevent null pointer exception in converters when maps are presented will null values
Offset reader optimisation to reduce S3 load
2.1.0
AWS S3 Sink Connector
Elasticsearch 7 Support
2.0.1
Hive Source
Rename option connect.hive.hive.metastore to connect.hive.metastore
Rename option connect.hive.hive.metastore.uris
2.0.0
Move to Scala 2.12
Move to Kafka 2.4.1 and Confluent 5.4
Deprecated:
If ONLY connect.s3.vhost.bucket is set:
The connector retains the "buggy" legacy behavior (passing the value directly to the SDK) to prevent breaking existing pipelines.
A deprecation warning will be logged.
Introduced Bulk Processing Mode to enhance write throughput.
Enhanced key population strategies for mapping Kafka record keys to CosmosDB document IDs.
If the lock file is modified by another task, the current task detects the change via eTag mismatch and exits to avoid conflicts.
If no leading slash exists in the MQTT topic, the first slash should not be stripped and instead replaced with an underscore, as with the remaining slashes.
The minimum duration in milliseconds for the first backoff
connect.servicebus.source.sleep.on.empty.poll.ms The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.
How to use: Configure batch.enabled=false in the KCQL mapping to enable sequential sending.
) to perform on source files after successful processing.
post.process.action.bucket: Specifies the target bucket for the MOVE action (required for MOVE).
post.process.action.prefix: Specifies a new prefix for the file’s location when using the MOVE action (required for MOVE).
Use cases:
Free up storage space by deleting files.
Archive or organize processed files by moving them to a new location.
Example 1 : Delete Files:
Map[TopicPartition, Offset]
to track the last processed offset for each topic-partition. This ensures that the sink does not attempt to process the same records repeatedly.
Batch Failure Handling: The changes also address a situation where an HTTP call fails due to a specific input, but the batch is not removed from the queue. This could have led to the batch being retried indefinitely, which is now prevented.
connect.s3.source.partition.search.excludes
Fix: Java class java.util.Date support in cloud sink maps
WITH_FLUSH_COUNT: Replaced by PROPERTIES ('flush.count'=$VALUE).
WITH_FLUSH_INTERVAL: Replaced by PROPERTIES ('flush.interval'=$VALUE).
Restart the sink connectors.
connect.s3.partition.search.continuous
changes to
connect.s3.source.partition.search.continuous
.
connect.s3.partition.search.recurse.levels changes to connect.s3.source.partition.search.recurse.levels.
If you use any of these properties, when you upgrade to the new version then your source will halt and the log will display an error message prompting you to adjust these properties. Be sure to update these properties in your configuration to enable the new version to run.
Dependency upgrade of Hadoop libraries version to mitigate against CVE-2022-25168.
Pulsar
Cloud sinks (AWS S3, Azure Data Lake and GCP Storage) now support BigDecimal and handle nullable keys.
Build System Updates: Implemented build system updates and improvements.
Stream Reactor Integration: Integrated Kafka-connect-query-language inside of Stream Reactor for enhanced compatibility.
STOREAS Consistency: Ensured consistent handling of backticks with STOREAS.
Introduced support for PROPERTIES to drive new settings required to drive the connectors’ behaviour. The KCQL looks like this: INSERT INTO ... SELECT ... FROM ... PROPERTIES(property=key, ...)
Commit Logging Refactoring: Refactored and simplified the CommitPolicy for more efficient commit logging (#964).
Comprehensive Testing: Added additional unit testing around configuration settings, removed redundancy from property names, and enhanced KCQL properties parsing to support Map structures.
Consolidated Naming Strategies: Merged naming strategies to reduce code complexity and ensure consistency. This effort ensures that both hierarchical and custom partition modes share similar code paths, addressing issues related to padding and the inclusion of keys and values within the partition name.
Optimized S3 API Calls: Switched from using deleteObjects to deleteObject for S3 API client calls (#957), enhancing performance and efficiency.
JClouds Removal: The update removes the use of JClouds, streamlining the codebase.
Legacy Offset Seek Removal: The release eliminates legacy offset seek operations, simplifying the code and enhancing overall efficiency
Start-End Tag Text Reader: Processes data enclosed by start and end tags, suitable for XML content.
Improved Parallelization: enhancements enable parallelization based on the number of connector tasks and available data partitions, optimizing data handling.
Data Consistency: Resolved data loss and duplication issues when the connector is restarted, ensuring reliable data transfer.
Dynamic Partition Discovery: No more need to restart the connector when new partitions are added; runtime partition discovery streamlines operations.
Efficient Storage Handling: The connector now ignores the .indexes directory, allowing data storage in an S3 bucket without a prefix.
Increased Default Records per Poll: the default limit on the number of records returned per poll was changed from 1024 to 10000, improving data retrieval efficiency and throughput.
Ordered File Processing: Added the ability to process files in date order. This feature is especially useful when S3 files lack lexicographical sorting, and S3 API optimisation cannot be leveraged. Please note that it involves reading and sorting files in memory.
Parquet INT96 Compatibility: The connector now allows Parquet INT96 to be read as a fixed array, preventing runtime failures.
VHost Bucket
aws.vhost.bucket
connect.s3.vhost.bucket
resume the stopped connectors
AWS S3 Sink Connector
Improves the error in case the input on BytesFormatWriter is not binary
Support for ByteBuffer which may be presented by Connect as bytes
INSERT INTO topic SELECT * FROM bucket:prefix
STOREAS `JSON`
PROPERTIES(
'post.process.action'='move',
'post.process.action.bucket'='target-bucket',
'post.process.action.prefix'='processed',
'post.process.action.watermark.process.late.arrival'='true'
)
InvalidHeaderValue: The value for one of the HTTP headers is not in the correct format.
x-ms-range-get-content-md5: true
pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)
java.util.ServiceConfigurationError:
io.confluent.kafka.schemaregistry.rules.RuleExecutor:
io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor not a subtype
HttpWriterManager has no writers. Perhaps no records have been put to the sink yet.
INSERT INTO ..... PROPERTIES('key.suffix'='unique-id1')...