Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Coming soon!
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.
This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.
For more examples see the tutorials.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.
You can specify multiple KCQL statements separated by ;
to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Service Bus of name <your-service-bus>
to Kafka topic of name <your-kafka-topic>
using the PROPERTIES specified.
The selection of fields from the Service Bus message is not supported.
Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.
MessageId
String
The message identifier that uniquely identifies the message and its payload.
Field Name
Schema Type
Description
deliveryCount
int64
The number of the times this message was delivered to clients.
enqueuedTimeUtc
int64
The time at which this message was enqueued in Azure Service Bus.
contentType
Optional String
The content type of this message.
label
Optional String
The application specific message label.
correlationId
Optional String
The correlation identifier.
messageProperties
Optional String
The map of user application properties of this message.
partitionKey
Optional String
The partition key for sending a message to a partitioned entity.
replyTo
Optional String
The address of an entity to send replies to.
replyToSessionId
Optional String
The session identifier augmenting the ReplyTo address.
deadLetterSource
Optional String
The name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLive
int64
The duration before this message expires.
lockedUntilUtc
Optional int64
The time when the lock of this message expires.
sequenceNumber
Optional int64
The unique number assigned to a message by Azure Service Bus.
sessionId
Optional String
The session identifier for a session-aware entity.
lockToken
Optional String
The lock token for the current message.
messageBody
Optional bytes
The body of this message as a byte array.
getTo
Optional String
The “to” address.
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the Azure Website.
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type
and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE
here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type
which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC
here and we need to pass it.
Parameter subscription.name
which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.
This is sufficient to enable you to create the mapping with your topic.
Please find below all the necessary KCQL properties:
servicebus.type
Specifies Service Bus type: QUEUE
or TOPIC
string
subscription.name
Specifies subscription name if Service Bus type is TOPIC
string
Please find below all the relevant configuration parameters:
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.source.task.records.queue.size
Specifies the Queue size between Service Bus Receivers and Kafka
int
20
connect.servicebus.source.sleep.on.empty.poll.ms
The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.
long
250
connect.servicebus.source.complete.retries.max
The maximum number of retries to complete a message.
int
3
connect.servicebus.source.complete.retries.min.backoff.ms
The minimum duration in milliseconds for the first backoff
long
1000
connect.servicebus.source.prefetch.count
The number of messages to prefetch from the Azure Service Bus.
int
2000
Lenses Kafka Connectors are a collection of open-source, Apache 2.0 licensed, Kafka Connect Connectors. Maintained by Lenses.io to deliver open-source Kafka Connectors to the community.
This page details the configuration options for the Stream Reactor Kafka Connect source connectors.
Source connectors read data from external systems and write to Kafka.
This page describes the Stream Reactor connector plugins.
This page describes an overview of Kafka Connect.
Kafka Connect uses Kafka Producer API and Kafka Consumer API to load data into Apache Kafka and output data from Kafka to another storage engine. A Connector is an instantiation of a connector plugin defined by a configuration file.
Kafka Connect is a plugin framework. It exposes APIs for developers to implement standard recipes (known as Connector Plugins) for exchanging data between Kafka and other data systems. It provides the runtime environment to execute the plugins in the form of Connect Clusters made up of Workers.
The fundamental building blocks of Connect Clusters are the Workers. Workers manage the plumbing required to interact with Kafka, which serves to coordinate workload. They also handle the management of connector plugins and the creation and supervision of their instances known as connector instances.
Connect Clusters can be configured either as a standalone cluster (consisting of one Worker) or distributed (consisting of many Workers, ideally on different machines). The benefit of the distributed mode is fault tolerance and load balancing of workload across machines.
A Connector Plugin is a concrete implementation of the Plugin APIs exposed by Connect for a specific third-party system, for example, S3. If a Connector Plugin is extracting data from a system and writing to Kafka it’s called a Source Plugin. If it ships data from Kafka to another system it’s called a Sink Plugin.
The modular architecture of Connect, with Workers managing connector plugins and connector instances, abstracts away the complexities of data integration, enabling developers to concentrate on the core functionality of their applications.
The Community has been developing Connectors to connect to all sorts of systems for years. Here at Lenses, we are the biggest contributor to open-source connectors via our Stream Reactor project.
The Connector Plugins are code, deployed as jar files, and added to the classpath of each Worker in the Connect cluster.
Why does it need to be on each Worker? The reason lies in the distributed nature of Connect and its workload distribution mechanism. The Connect framework evenly distributes tasks among multiple Workers using the Kafka consumer group protocol. This ensures load balancing and fault tolerance within the Connect cluster. Each Worker is responsible for executing a subset of the assigned tasks.
To enable the execution of assigned workloads, each Worker needs to have access to the required plugins locally. By having the plugins available on each Worker's classpath, the Connect framework can dynamically load and utilize them whenever necessary. This approach eliminates the need for plugins to be distributed separately or retrieved from a centralized location during runtime. But it does mean when you install your Connect Cluster that you need to ensure each plugin is also installed.
Each Connector Plugin has to implement two interfaces:
Connector (Source or Sink)
Task (Source or Sink)
The Connector interface is responsible for defining and configuring the instance of the Connector, for example, validating configuration and then splitting that configuration up for the Connect APIs to distribute amongst the workers.
The actual work is done by the Task class. Connect sends out, via Kafka, a configuration, defined by the Connector class to each worker. Each assigned Worker picks up the task (it's listening to the configuration topic), and creates an instance of the task.
Connector instances are created on the Worker you submit the creation request to, task instances can also be on the same Worker but also other Workers. Connect distributes the tasks to Workers via Kafka, they have internal consumer groups listening to the system topics Connect uses. If you look into the default topic, connect-configs, you will see the split of Connector configs, for each task.
A Kafka message is made up of:
Headers
Key
Value
Headers are a map while the key and value are stored as byte arrays in Kafka. Typically these byte arrays represent data stored as JSON or AVRO, however, it could be anything.
To decouple from the format of data inside Kafka topics, Connect uses an internal format called Struct . Structs have fields and fields have types defined in a Schema.
Converters translate the byte arrays which could be AVRO or other formats to Struct for Sink connectors and vice versa for Source connectors.
For example, you use Avro as your data format in your cluster. The Avro converter allows you to build connectors and interact with Connect Struct only, you may decide to move to Protobuf later and don't want to reimplement your connector. This is where the converter comes in. It handles converting the Struct to Avro for Source connectors and Struct to external systems, e.g. Cassandra, for sinks. You can then swap this out for a different converter, you as a developer only deal with Structs.
The following converters are available with Apache Kafka:
org.apache.kafka.connect.converters.DoubleConverter
: Serializes to and deserializes from DOUBLE values. When converting from bytes to Connect format, the converter returns an optional FLOAT64 schema.
org.apache.kafka.connect.converters.FloatConverter
: Serializes to and deserializes from FLOAT values. When converting from bytes to Connect format, the converter returns an optional FLOAT32 schema.
org.apache.kafka.connect.converters.IntegerConverter
: Serializes to and deserializes from INTEGER values. When converting from bytes to Connect format, the converter returns an optional INT32 schema.
org.apache.kafka.connect.converters.LongConverter
: Serializes to and deserializes from LONG values. When converting from bytes to Connect format, the converter returns an optional INT64 schema.
org.apache.kafka.connect.converters.ShortConverter
: Serializes to and deserializes from SHORT values. When converting from bytes to Connect format, the converter returns an optional INT16 schema.
Confluent provides support for (Schema Registry required):
AvroConverter io.confluent.connect.avro.AvroConverter
ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter
JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter
Kafka Connect supports an internal data type of Secret
. If a connector implements this type as part of its Config definition it will be masked in any logging, however, it will still be exposed in API calls to Kafka Connect Workers.
To solve this, Kafka Connect supports secret provider plugins. They allow for indirect references, resolved at the initialization of a Connector instance, to external secret providers.
Lenses.io provides Secret Providers for Azure, AWS, Hashicorp Vault and Environment variables here.
SMTs are plugins, that enable users to manipulate records, one at a time. For Source Connectors they manipulate records after the Task has handed them back to the Connect Framework and before they are written to Kafka. For Sink Connectors, they allow for manipulation of records before they as passed from the Connect Framework to the Sink Task.
Apache Kafka comes with a number of available SMTs, for which the documentation can be found here.
Lenses.io also provides a number of SMTs which can be found here.
This page describes the usage of the Stream Reactor AWS S3 Source Connector.
This connector is also available on the AWS Marketplace.
Objects that have been archived to AWS Glacier storage class are skipped, in order to load these objects you must manually restore the objects. Skipped objects are logged in the Connect workers log files.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The S3 source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket.
connect.s3.source.partition.extractor.regex=none
connect.s3.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read objects containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.
Text: The connector will read objects containing lines of text, each line representing a distinct record.
CSV: The connector will read objects containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
JSON
STRING
Same,Other
Yes, No
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
Yes
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
No
ByteArrayConverter
ByteArrayConverter
JSON
JSON
Same,Other
Yes
JsonConverter
StringConverter
JSON
JSON
Same,Other
No
StringConverter
StringConverter
AVRO,Parquet
JSON
Same,Other
Yes,No
JsonConverter
JsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSON
BYTES
Same,Other
Yes,No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Same
Yes
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
AVRO
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Other
Yes,No
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
Yes
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
Protobuf
Other
Yes,No
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSON
Other
Same, Other
Yes,No
ByteArrayConverter
ByteArrayConverter
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
The S3 sink employs zero-padding in object names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of object.
When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical object key name order.
In such cases, to process object in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type
to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.
To limit the number of object keys the source reads from S3 in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The AWS S3 Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
.
The connect.s3.source.extension.excludes
property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt
and .csv
objects, you would set this property as follows:
The connect.s3.source.extension.includes
property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json
and .xml
objects, you would set this property as follows:
Note: If both connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving files to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.
Deleting Objects After Processing
For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.
Example:
Result: Objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.
Moving Objects to an Archive Bucket
To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.
Example:
Result: Objects are transferred to an archive-bucket, stored with an updated path that includes the processed/
prefix, maintaining an organized archive structure.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
read.text.mode
Controls how Text content is read
Enum
Regex, StartEndTag, StartEndLine
read.text.regex
Regular Expression for Text Reading (if applicable)
String
read.text.start.tag
Start Tag for Text Reading (if applicable)
String
read.text.end.tag
End Tag for Text Reading (if applicable)
String
read.text.buffer.size
Text Buffer Size (for optimization)
Int
read.text.start.line
Start Line for Text Reading (if applicable)
String
read.text.end.line
End Line for Text Reading (if applicable)
String
read.text.trim
Trim Text During Reading
Boolean
store.envelope
Messages are stored as “Envelope”
Boolean
post.process.action
Defines the action to perform on source objects after successful processing.
Enum
DELETE or MOVE
post.process.action.bucket
Specifies the target bucket for the MOVE
action (required for MOVE
).
String
post.process.action.prefix
Specifies a new prefix for the object’s location when using the MOVE
action (required for MOVE
).
String
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
The connector can also be used against API compatible systems provided they implement the following:
connect.s3.aws.auth.mode
Specifies the AWS authentication mode for connecting to S3.
string
"Credentials," "Default"
"Default"
connect.s3.aws.access.key
Access Key for AWS S3 Credentials
string
connect.s3.aws.secret.key
Secret Key for AWS S3 Credentials
string
connect.s3.aws.region
AWS Region for S3 Bucket
string
connect.s3.pool.max.connections
Maximum Connections in the Connection Pool
int
-1 (undefined)
50
connect.s3.custom.endpoint
Custom Endpoint URL for S3 (if applicable)
string
connect.s3.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
connect.s3.vhost.bucket
Enable Virtual Hosted-style Buckets for S3
boolean
true, false
false
connect.s3.source.extension.excludes
A comma-separated list of object extensions to exclude from the source object search.
string
[Object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.s3.source.extension.includes
A comma-separated list of object extensions to include in the source object search.
string
[object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.s3.source.partition.extractor.type
Type of Partition Extractor (Hierarchical or Regex)
string
hierarchical, regex
connect.s3.source.partition.extractor.regex
Regex Pattern for Partition Extraction (if applicable)
string
connect.s3.ordering.type
Type of ordering for the S3 object keys to ensure the processing order.
string
AlphaNumeric, LastModified
AlphaNumeric
connect.s3.source.partition.search.continuous
If set to true the connector will continuously search for new partitions.
boolean
true, false
true
connect.s3.source.partition.search.excludes
A comma-separated list of paths to exclude from the partition search.
string
".indexes"
connect.s3.source.partition.search.interval
The interval in milliseconds between searching for new partitions.
long
300000
connect.s3.source.partition.search.recurse.levels
Controls how many levels deep to recurse when searching for new partitions
int
0
This page describes the usage of the Stream Reactor GCP Storage Source Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The GCP Storage source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket.
connect.gcpstorage.source.partition.extractor.regex=none
connect.gcpstorage.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read objects containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
Text: The connector will read objects containing lines of text, each line representing a distinct record.
CSV: The connector will read objects containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical object name order.
In such cases, to process objects in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type
to LastModified
. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.
To limit the number of object names the source reads from GCS in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The GCP Storage Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
.
The connect.gcpstorage.source.extension.excludes
property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt
and .csv
objects, you would set this property as follows:
The connect.gcpstorage.source.extension.includes
property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json
and .xml
objects, you would set this property as follows:
Note: If both connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving objects to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.
Deleting objects After Processing
For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.
Example:
Result: objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.
Moving objects to an Archive Bucket
To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.
Example:
Result: objects are transferred to an archive-bucket, stored with an updated path that includes the processed/
prefix, maintaining an organized archive structure.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “File” mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:
When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.
When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.
To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type
property, which supports two options:
hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.
regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex
property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:
This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.
A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.
In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info .
For more examples see the .
Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):
Connector allows for multiple KCQL commands.
The following KCQL is supported:
The selection of fields from the Event Hubs message is not supported.
As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.
You can connect to Azure EventHubs passing specific JAAS parameters in configuration.
The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings
and when user specifies a property with it, it will be automatically passed to the Consumer.
User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.
It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records
is passed directly to it)
There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:
client.id
- Connector sets it by itself
group.id
- Connector sets it by itself
key.deserializer
- Connector transitions bytes 1-to-1
value.deserializer
- Connector transitions bytes 1-to-1
enable.auto.commit
- connector automatically sets it to false
and checks what offsets are committed in output topic instead
This page describes installing the Lenses Kafka Connectors.
If you do not use the plugin.path
and add the connectors directly to the CLASSPATH you may have dependency conflicts.
Download the release and unpack.
Within the unpacked directory you will find the following structure:
The libs directory contains all the Stream Reactor Connector jars. Edit your Connect worker properties add the path to the directory containing the connectors and restart your workers. Repeat this process for all the Connect workers in your cluster. The connectors must be available to all the workers.
Example:
s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of objects.
When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
Learn more about different methods of connecting to Event Hubs on . The only caveat is to add connector-specific prefix like in example above. See for more info.
JSON
STRING
Same,Other
Yes, No
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
Yes
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
No
ByteArrayConverter
ByteArrayConverter
JSON
JSON
Same,Other
Yes
JsonConverter
StringConverter
JSON
JSON
Same,Other
No
StringConverter
StringConverter
AVRO,Parquet
JSON
Same,Other
Yes,No
JsonConverter
JsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSON
BYTES
Same,Other
Yes,No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Same
Yes
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
AVRO
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Other
Yes,No
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
Yes
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
Protobuf
Other
Yes,No
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSON
Other
Same, Other
Yes,No
ByteArrayConverter
ByteArrayConverter
read.text.mode
Controls how Text content is read
Enum
Regex, StartEndTag, StartEndLine
read.text.regex
Regular Expression for Text Reading (if applicable)
String
read.text.start.tag
Start Tag for Text Reading (if applicable)
String
read.text.end.tag
End Tag for Text Reading (if applicable)
String
read.text.buffer.size
Text Buffer Size (for optimization)
Int
read.text.start.line
Start Line for Text Reading (if applicable)
String
read.text.end.line
End Line for Text Reading (if applicable)
String
read.text.trim
Trim Text During Reading
Boolean
store.envelope
Messages are stored as “Envelope”
Boolean
post.process.action
Defines the action to perform on source objects after successful processing.
Enum
DELETE or MOVE
post.process.action.bucket
Specifies the target bucket for the MOVE
action (required for MOVE
).
String
post.process.action.prefix
Specifies a new prefix for the object’s location when using the MOVE
action (required for MOVE
).
String
connect.gcpstorage.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
"Credentials", "File" or "Default"
"Default"
connect.gcpstorage.gcp.credentials
For "auth.mode" credentials: GCP Authentication credentials string.
string
(Empty)
connect.gcpstorage.gcp.file
For "auth.mode" file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.gcpstorage.gcp.project.id
GCP Project ID.
string
(Empty)
connect.gcpstorage.gcp.quota.project.id
GCP Quota Project ID.
string
(Empty)
connect.gcpstorage.endpoint
Endpoint for GCP Storage.
string
connect.gcpstorage.error.policy
Defines the error handling policy when errors occur during data transfer to or from GCP Storage.
string
"NOOP," "THROW," "RETRY"
"THROW"
connect.gcpstorage.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.gcpstorage.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.gcpstorage.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.
long
5
connect.gcpstorage.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.gcpstorage.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
[kcql configuration]({{< relref "#kcql-support" >}})
connect.gcpstorage.source.extension.excludes
A comma-separated list of object extensions to exclude from the source object search.
string
[object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.gcpstorage.source.extension.includes
A comma-separated list of object extensions to include in the source object search.
string
[object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.gcpstorage.source.partition.extractor.type
Type of Partition Extractor (Hierarchical or Regex)
string
hierarchical, regex
connect.gcpstorage.source.partition.extractor.regex
Regex Pattern for Partition Extraction (if applicable)
string
connect.gcpstorage.source.partition.search.continuous
If set to true the connector will continuously search for new partitions.
boolean
true, false
true
connect.gcpstorage.source.partition.search.interval
The interval in milliseconds between searching for new partitions.
long
300000
connect.gcpstorage.source.partition.search.excludes
A comma-separated list of paths to exclude from the partition search.
string
".indexes"
connect.gcpstorage.source.partition.search.recurse.levels
Controls how many levels deep to recurse when searching for new partitions
int
0
connect.gcpstorage.ordering,type
Type of ordering for the GCS object keys to ensure the processing order.
string
AlphaNumeric, LastModified
AlphaNumeric
connect.eventhubs.source.connection.settings.bootstrap.servers
Specifies the Event Hubs server location.
string
connect.eventhubs.source.close.timeout
Amount of time (in seconds) for Consumer to close.
int
30
connect.eventhubs.source.default.offset
Specifies whether by default we should consume from earliest (default) or latest offset.
string
earliest
connect.eventhubs.kcql
Comma-separated output KCQL queries
string
This page describes the usage of the Stream Reactor Cassandra Source Connector.
Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
Examples:
The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
Keying is only supported in conjunction with the WITHFORMAT JSON clause
This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:
TIMESTAMP
TIMEUUID
TOKEN
DSESEARCHTIMESTAMP
If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.
You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.
DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.
The connector constantly loads the entire table.
The connector can be configured to:
Start from a particular offset - connect.cassandra.initial.offset
Increase or decrease the poll interval - connect.cassandra.import.poll.interval
Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration
For a more detailed explanation of how to use Cassandra to Kafka options.
The following CQL data types are supported:
TimeUUID
Optional String
UUID
Optional String
Inet
Optional String
Ascii
Optional String
Text
Optional String
Timestamp
Optional String
Date
Optional String
Tuple
Optional String
UDT
Optional String
Boolean
Optional Boolean
TinyInt
Optional Int8
SmallInt
Optional Int16
Int
Optional Int32
Decimal
Optional String
Float
Optional Float32
Counter
Optional Int64
BigInt
Optional Int64
VarInt
Optional Int64
Double
Optional Int64
Time
Optional Int64
Blob
Optional Bytes
Map
Optional [String -> MAP]
List
Optional [String -> ARRAY]
Set
Optional [String -> ARRAY]
connect.cassandra.contact.points
Initial contact point host for Cassandra including port.
string
localhost
connect.cassandra.port
Cassandra native port.
int
9042
connect.cassandra.key.space
Keyspace to write to.
string
connect.cassandra.username
Username to connect to Cassandra with.
string
connect.cassandra.password
Password for the username to connect to Cassandra with.
password
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
boolean
false
connect.cassandra.trust.store.path
Path to the client Trust Store.
string
connect.cassandra.trust.store.password
Password for the client Trust Store.
password
connect.cassandra.trust.store.type
Type of the Trust Store, defaults to JKS
string
JKS
connect.cassandra.key.store.type
Type of the Key Store, defauts to JKS
string
JKS
connect.cassandra.ssl.client.cert.auth
Enable client certification authentication by Cassandra. Requires KeyStore options to be set.
boolean
false
connect.cassandra.key.store.path
Path to the client Key Store.
string
connect.cassandra.key.store.password
Password for the client Key Store
password
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.
string
connect.cassandra.fetch.size
The number of records the Cassandra driver will return at once.
int
5000
connect.cassandra.load.balancing.policy
Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN
string
TOKEN_AWARE
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
string
THROW
connect.cassandra.max.retries
The maximum number of times to try the write again.
int
20
connect.cassandra.retry.interval
The time in milliseconds between retries.
int
60000
connect.cassandra.task.buffer.size
The size of the queue as read writes to.
int
10000
connect.cassandra.assigned.tables
The tables a task has been assigned.
string
connect.cassandra.batch.size
The number of records the source task should drain from the reader queue.
int
100
connect.cassandra.import.poll.interval
The polling interval between queries against tables for bulk mode.
long
1000
connect.cassandra.time.slice.ms
The range of time in milliseconds the source task the timestamp/timeuuid will use for query
long
10000
connect.cassandra.import.allow.filtering
Enable ALLOW FILTERING in incremental selects.
boolean
true
connect.cassandra.slice.duration
Duration to query for in target Cassandra table. Used to restrict query timestamp span
long
10000
connect.cassandra.slice.delay.ms
The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available
long
30000
connect.cassandra.initial.offset
The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z
string
1900-01-01 00:00:00.0000000Z
connect.cassandra.mapping.collection.to.json
Mapping columns with type Map, List and Set like json
boolean
true
connect.cassandra.kcql
KCQL expression describing field selection and routes.
string
\
This page describes the usage of the Stream Reactor FTP Source Connector.
Provide the remote directories and on specified intervals, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting them into Kafka.
For more examples see the tutorials.
Each Kafka record represents a file and has the following types.
The format of the keys is configurable through connect.ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with the name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.
The values of the records contain the body of the file as bytes.
The following rules are used.
Tailed files are only allowed to grow. Bytes that have been appended to it since the last inspection are yielded. Preceding bytes are not allowed to change;
Updated files can grow, shrink and change anywhere. The entire contents are yielded.
Instead of dumping whole file bodies (and the danger of exceeding Kafka’s message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:
The default object that is used is a pass-through converter, an instance of:
To override it, create your own implementation of SourceRecordConverter and place the jar in the plugin.path
.
To learn more examples of using the FTP Kafka connector read this blog.
connect.ftp.address
host[:port] of the ftp server
string
connect.ftp.user
Username to connect with
string
connect.ftp.password
Password to connect with
string
connect.ftp.refresh
iso8601 duration that the server is polled
string
connect.ftp.file.maxage
iso8601 duration for how old files can be
string
connect.ftp.keystyle
SourceRecord keystyle, string or struct
string
connect.ftp.protocol
Protocol to use, FTP or FTPS
string
ftp
connect.ftp.timeout
Ftp connection timeout in milliseconds
int
30000
connect.ftp.filter
Regular expression to use when selecting files for processing
string
.*
connect.ftp.monitor.tail
Comma separated lists of path:destinationtopic; tail of file to tracked
string
connect.ftp.monitor.update
Comma separated lists of path:destinationtopic; whole file is tracked
string
connect.ftp.monitor.slicesize
File slice size in bytes
int
-1
connect.ftp.fileconverter
File converter class
string
com.datamountaineer.streamreactor.connect.ftp.source.SimpleFileConverter
connect.ftp.sourcerecordconverter
Source record converter class
string
com.datamountaineer.streamreactor.connect.ftp.source.NopSourceRecordConverter
connect.ftp.max.poll.records
Max number of records returned per poll
int
10000
This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.
A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by**;
** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert is the default write mode of the sink. It inserts messages from Kafka topics into DocumentDB.
The Sink supports DocumentDB upsert functionality which replaces the existing row if a match is found on the primary keys.
This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.
If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
The connector supports Error policies.
connect.documentdb.endpoint
The Azure DocumentDb end point.
string
connect.documentdb.master.key
The connection master key
password
connect.documentdb.consistency.level
Determines the write visibility. There are four possible values: Strong,BoundedStaleness,Session or Eventual
string
Session
connect.documentdb.db
The Azure DocumentDb target database.
string
connect.documentdb.db.create
If set to true it will create the database if it doesn’t exist. If this is set to default(false) an exception will be raised.
boolean
false
connect.documentdb.proxy
Specifies the connection proxy details.
string
connect.documentdb.error.policy
Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.documentdb.max.retries
The maximum number of times to try the write again.
int
20
connect.documentdb.retry.interval
The time in milliseconds between retries.
int
60000
connect.documentdb.kcql
KCQL expression describing field selection and data routing to the target DocumentDb.
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
This page describes the usage of the Stream Reactor MQTT Source Connector.
A Kafka Connect source connector to read events from MQTT and push them to Kafka.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.
The connector supports converters to handle different messages payload format in the source topic. See source record converters.
connect.mqtt.hosts
Contains the MQTT connection end points.
string
connect.mqtt.username
Contains the Mqtt connection user name
string
connect.mqtt.password
Contains the Mqtt connection password
password
connect.mqtt.service.quality
Specifies the Mqtt quality of service
int
connect.mqtt.timeout
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.log.message
Logs received MQTT messages
boolean
false
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.mqtt.polling.timeout
Provides the timeout to poll incoming messages
int
1000
connect.mqtt.share.replicate
Replicate the shared subscriptions to all tasks instead of distributing them
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
connect.mqtt.process.duplicates
Process duplicate messages
boolean
false
This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values
property in your KCQL with one of the possible values:
IGNORE
(ignores tombstones entirely)
FAIL
(throws Exception if tombstone happens)
DELETE
(deletes index with specified id)
Example:
The PK keyword allows you to specify fields that will be used to generate the key value in Elasticsearch. The values of the selected fields are concatenated and separated by a hyphen (-
).
If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.
Field Prefixes
When defining fields, specific prefixes can be used to determine where the data should be extracted from:
_key
Prefix
Specifies that the value should be extracted from the message key.
If a path is provided after _key
, it identifies the location within the key where the field value resides.
If no path is provided, the entire message key is used as the value.
_value
Prefix
Specifies that the value should be extracted from the message value.
The remainder of the path identifies the specific location within the message value to extract the field.
_header
Prefix
Specifies that the value should be extracted from the message header.
The remainder of the path indicates the name of the header to be used for the field value.
INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.
WITHDOCTYPE
allows you to associate a document type to the document inserted.
WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.
Example:
To use a static index name, define the target index in the KCQL statement without any prefixes:
This will consistently create an index named index_name
for any messages consumed from topicA
.
To extract an index name from a message header, use the _header
prefix followed by the header name:
This statement extracts the value from the gate
header field and uses it as the index name.
For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
In this case, the value of the header named prefix.abc.suffix
is used to form the index name.
To use the full value of the message key as the index name, use the _key
prefix:
For example, if the message key is "freddie"
, the resulting index name will be freddie
.
To extract an index name from a field within the message value, use the _value
prefix followed by the field name:
This example uses the value of the name
field from the message's value. If the field contains "jason"
, the index name will be jason
.
Nested Fields in Values
To access nested fields within a value, specify the full path using dot notation:
If the firstName
field is nested within the name
structure, its value (e.g., "hans"
) will be used as the index name.
Fields with Dots in Their Names
For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
If the value structure contains:
The extracted index name will be hans
.
The Sink will automatically create missing indexes at startup.
Please note that this feature is not compatible with index names extracted from message headers/keys/values.
connect.elastic.protocol
URL protocol (http, https)
string
http
connect.elastic.hosts
List of hostnames for Elastic Search cluster node, not including protocol or port.
string
localhost
connect.elastic.port
Port on which Elastic Search node listens on
string
9300
connect.elastic.tableprefix
Table prefix (optional)
string
connect.elastic.cluster.name
Name of the elastic search cluster, used in local mode for setting the connection
string
elasticsearch
connect.elastic.write.timeout
The time to wait in millis. Default is 5 minutes.
int
300000
connect.elastic.batch.size
How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once
int
4000
connect.elastic.use.http.username
Username if HTTP Basic Auth required default is null.
string
connect.elastic.use.http.password
Password if HTTP Basic Auth required default is null.
string
connect.elastic.error.policy
Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.elastic.max.retries
The maximum number of times to try the write again.
int
20
connect.elastic.retry.interval
The time in milliseconds between retries.
int
60000
connect.elastic.kcql
KCQL expression describing field selection and routes.
string
connect.elastic.pk.separator
Separator used when have more that one field in PK
string
-
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
behavior.on.null.values
Specifies behavior on Kafka tombstones: IGNORE
, DELETE
or FAIL
String
IGNORE
Property Name
Description
ssl.truststore.location
Path to the truststore file containing the trusted CA certificates for verifying broker certificates.
ssl.truststore.password
Password for the truststore file to protect its integrity.
ssl.truststore.type
Type of the truststore (e.g., JKS
, PKCS12
). Default is JKS
.
ssl.keystore.location
Path to the keystore file containing the client’s private key and certificate chain for client authentication.
ssl.keystore.password
Password for the keystore to protect the private key.
ssl.keystore.type
Type of the keystore (e.g., JKS
, PKCS12
). Default is JKS
.
ssl.protocol
The SSL protocol used for secure connections (e.g., TLSv1.2
, TLSv1.3
). Default is TLS
.
ssl.trustmanager.algorithm
Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
ssl.keymanager.algorithm
Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.
While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:
Data encryption: Prevents unauthorized access to data being transferred.
Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.
Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).
Truststore: Holds certificates to check if the node’s certificate is valid.
Keystore: Contains your client’s private key and certificate to prove your identity to the node.
SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.
Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.
This page describes the usage of the Stream Reactor Google PubSub Sink Connector.
Coming soon!
This page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.
Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Bus in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.
For more examples see the tutorials.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in #storage-to-output-matrix. Feel free to tweak the configuration to your requirements.
You can specify multiple KCQL statements separated by ;
to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Kafka topic of name <your-kafka-topic>
to Service Bus of name <your-service-bus>
using the PROPERTIES specified (please check #keyed-json-format for more info on necessary properties)
The selection of fields from the Service Bus message is not supported.
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the Azure Website.
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
In order to be writing to the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type
and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE
here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type
which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC
here and we need to pass it.
This is sufficient to enable you to create the mapping with your topic.
If the Connector is supposed to transfer big messages (size of one megabyte and more), Service Bus may not want to accept a batch of such payloads, failing the Connector Task. In order to remediate that you may want to use batch.enabled
parameter, setting it to false
. This will sacrifice the ability to send the messages in batch (possibly doing it slower) but should enable user to transfer them safely.
For most of the usages, we recommend omitting it (it's set to true
by default).
This sink supports the following Kafka payloads:
String Schema Key and Binary payload (then MessageId
in Service Bus is set with Kafka Key)
any other key (or keyless) and Binary payload (this causes Service Bus messages to not have specified MessageId
)
No Schema and JSON
Azure Service Bus doesn't allow to send messages with null content (payload)
Null Payload (sometimes referred as Kafka Tombstone) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren't allowed to send messages with null payload and we have to drop them instead.
Please keep that in mind when using Service Bus and designing business logic around null payloads!
Please find below all the necessary KCQL properties:
servicebus.type
Specifies Service Bus type: QUEUE
or TOPIC
string
batch.enabled
boolean
true
Please find below all the relevant configuration parameters:
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.sink.retries.max
Number of retries if message has failed to be delivered to Service Bus
int
3
connect.servicebus.sink.retries.timeout
Timeout (in milliseconds) between retries if message has failed to be delivered to Service Bus
int
500
This page describes how to retrieve secrets from Hashicorp Vault for use in Kafka Connect.
Secure secrets in Hashicorp Vault and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
From Version 2.2.0, the secret provider does not write secrets to files by default. If you require this behaviour (for trust stores, key stores or certs) you can enable this by adding the property file.write=true
.
Multiple authentication methods are supported:
approle
userpass
kubernetes
cert
token
ldap
gcp
awsiam
jwt
github
Example Worker Properties
To use this provider in a connector, reference the Hashicorp Vault containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path is the path of the secret in Hashicorp Vault
key is the name of the secret key in secret to retrieve. Vault can store multiple keys under a path.
For example, if we store two secrets as keys:
my_username_key with the value lenses and
my_password_key with the value my-secret-password
in a secret called secret/my-vault-secret we would set:
This would resolve at runtime to:
The provider handles the following types:
utf_8
base64
The provider will look for keys prefixed with:
UTF8
UTF_FILE
BASE64
BASE64_FILE
The UTF8
means the value returned is the string retrieved for the secret key. The BASE64
means the value returned is the base64 decoded string retrieved for the secret key.
If the value for the tag is UTF8_FILE
the string contents are written to a file. The returned value from the connector configuration key will be the location of the file. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties
file.
If the value for the tag is BASE64_FILE
the string contents are based64 decoded and are written to a file. The returned value from the connector configuration key will be the location of the file. For example, if a connector needs a PEM file on disk set the prefix as BASE64_FILE
. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties
file.
If no prefix is found the contents of the secret string are returned.
This page details the configuration options for the Stream Reactor Kafka Connect sink connectors.
Sink connectors read data from Kafka and write to an external system.
Kafka topic retention policies determine how long a message is retained in a topic before it is deleted. If the retention period expires and the connector has not processed the messages, possibly due to not running or other issues, the unprocessed Kafka data will be deleted as per the retention policy. This can lead to significant data loss since the messages will no longer be available for the connector to sink to the target system.
Yes, the datalakes connectors natively support exactly-once guarantees.
Field names in Kafka message headers or values may contain dots (.
). To access these correctly, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
For field names with spaces or special characters, use a similar escaping strategy:
Field name with a space: `_value.'full name'`
Field name with special characters: `_value.'$special_characters!'`
This ensures the connector correctly extracts the intended fields and avoids parsing errors.
This page describes the usage of the Stream Reactor HTTP Sink Connector.
A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.
Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).
URL, header and content templating ability give you full control of the HTTP request.
Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.
For more examples see the .
The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.
If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:
When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.
In your configuration, in the content property of your config, you can define template substitutions like the following example:
(please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)
To collapse multiple messages into a single HTTP request, you can use the multiple messaging template. This is automatic if the template has a messages
tag. See the below example:
Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.
Your connector configuration will look like this:
The final result will be HTTP requests with bodies like this:
When using simple and multiple message templating, the following are available:
URL including protocol (eg. http://lenses.io
). Template variables can be used.
Currently, the HTTP Sink supports either no authentication, BASIC HTTP authentication and OAuth2 authentication.
By default, no authentication is set. This can be also done by providing a configuration like this:
BASIC auth can be configured by providing a configuration like this:
OAuth auth can be configured by providing a configuration like this:
To customise the headers sent with your HTTP request you can supply a Headers List.
Example:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It's worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven't reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the batchCount
, batchSize
and `timeInterval properties. The settings are optional and if not specified the defaults are:
Some configuration examples follow on how to apply this connector to different message types.
These include converters, which are required to instruct Kafka Connect on how to read the source content.
In this case the converters are irrelevant as we are not using the message content to populate our message template.
The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.
Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.
The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.
Fields from the AVRO message are substituted into the message body in the following example:
Starting from version 8.1 as pilot release we give our customers ability to use functionality called Reporter which (if enabled) writes Success and Error processing reports to specified Kafka topic. Reports don't have key and you can find details about status in the message headers and value.
In order to enable this functionality we have to enable one (or both if we want full reporting) of the properties below:
This is the most common scenario for on-premises Kafka Clusters used just for monitoring
Using SASL provides a secure and standardized method for authenticating connections to an external Kafka cluster. It is especially valuable when connecting to clusters that require secure communication, as it supports mechanisms like SCRAM, GSSAPI (Kerberos), and OAuth, ensuring that only authorized clients can access the cluster. Additionally, SASL can help safeguard credentials during transmission, reducing the risk of unauthorized access.
Using SSL ensures secure communication between clients and the Kafka cluster by encrypting data in transit. This prevents unauthorized parties from intercepting or tampering with sensitive information. SSL also supports mutual authentication, allowing both the client and server to verify each other’s identities, which enhances trust and security in the connection.
This sink connector supports the following options as part of its configuration:
This page describes the usage of the Stream Reactor Cassandra Sink Connector.
The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.
For more examples see the .
You can specify multiple KCQL statements separated by**;
** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.
Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:
If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:
Deletion will only occur if a message with an empty payload is received from Kafka.
Ensure your ordinal position of the connect.cassandra.delete.struct_flds
matches the binding order in the Cassandra delete statement!
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor GCP Storage Sink Connector.
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
For more examples see the .
The connector uses KCQL to map topics to GCP Storage buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as Json can use fields contaiing .
:
In this case you can use the following KCQL statement:
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics
or topics.regex
property in the connector and ensure proper mapping to the KCQL statements.
Set the FROM clause to *. This will auto map the topic as a partition.
The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The sink connector optimizes performance by padding the output object names, a practice that proves beneficial when using the GCP Storage Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the GCP Storage Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.
The object key serves as the filename used to store data in GCP Storage. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY
clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension
(GCP Athena naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext.
The extension is determined by the selected storage format.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.
The partition clause works on header, key and values fields of the Kafka message.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of GCP Storage key length restrictions.
To extract fields from the message values, simply use the field names in the PARTITIONBY
clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the GCP Storage object key name:
Kafka message headers can also be used in the GCP Storage object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure GCP Storage object keys effectively.
To enable Athena-like partitioning, use the following syntax:
Storing data in GCP Storage and partitioning it by time is a common practice in data management. For instance, you may want to organize your GCP Storage data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY
clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp
. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY
clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
While the STOREAS
clause is optional, it plays a pivotal role in determining the storage format within GCP Storage. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter
and value.converter
settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate object. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in GCP Storage. For cases where you prefer to consolidate multiple records into a single binary object, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope
property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in GCP Storage is desired.
Storing the message Value Avro data as Parquet in GCP Storage:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in GCP Storage.
Enabling the full message stored as JSON in GCP Storage:
Enabling the full message stored as AVRO in GCP Storage:
If the restore (see the GCP Storage Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a object flush after a specified number of records have been written to it.
Flush by Size - initiates a object flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a object flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that objects are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to GCP Storage). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.
Here are the available options for the connect.gcpstorage.compression.codec
, along with indications of their support by Avro, Parquet and JSON writers:
Please note that not all compression libraries are bundled with the GCP Storage connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with GCP.
Credentials: In this mode, explicit configuration of GCP Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “Connection String” mode:
The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.
By default, the prefix for these index objects is named .indexes for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this .indexes
prefix.
You can configure the root prefix for these index objects using the property connect.gcpstorage.indexes.name
. This property specifies the path from the root of the GCS bucket. Note that even if you configure this property, the connector will still create a nested prefix within the specified prefix.
This page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.
This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to Azure Data Lake Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.
For more examples see the .
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The connector uses KCQL to map topics to Datalake buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .
:
In this case, you can use the following KCQL statement:
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics
or topics.regex
property in the connector and ensure proper mapping to the KCQL statements.
Set the FROM clause to *. This will auto map the topic as a partition.
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the Datalake Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the Datalake Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.
The object key serves as the filename used to store data in Datalake. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY
clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension
(naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext
. The extension is determined by the selected storage format.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.
The partition clause works on Header, Key and Values fields of the Kafka message.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of Azure Datalake key length restrictions.
To extract fields from the message values, simply use the field names in the PARTITIONBY
clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the Datalake object key name:
Kafka message headers can also be used in the Datalake object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure Datalake object keys effectively.
To enable Athena-like partitioning, use the following syn
Storing data in Azure Datalake and partitioning it by time is a common practice in data management. For instance, you may want to organize your Datalake data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY
clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp
. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY
clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
While the STOREAS
clause is optional, it plays a pivotal role in determining the storage format within Azure Datalake. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter
and value.converter
settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in Datalake. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope
property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in Datalake is desired.
Storing the message Value Avro data as Parquet in Datalake:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in Datalake.
Enabling the full message stored as JSON in Datalake:
Enabling the full message stored as AVRO in Datalake:
If the restore (see the Datalake Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to Data Lake). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.
Here are the available options for the connect.gcpstorage.compression.codec
, along with indications of their support by Avro, Parquet and JSON writers:
Please note that not all compression libraries are bundled with the Datalake connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
The connector offers two distinct authentication modes:
Default: This mode relies on the default Azure authentication chain, simplifying the authentication process.
Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with Azure.
Credentials: In this mode, explicit configuration of Azure Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “Connection String” mode:
For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers.
The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.
By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes
directory.
You can configure the root directory for these index files using the property connect.datalake.indexes.name
. This property specifies the path from the root of the data lake filesystem. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.
This page describes the usage of the Stream Reactor MQTT Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
The connector can dynamically write to MQTT topics determined by a field in the Kafka message value by using the WITHTARGET target clause and specifying $field
as the target field to extract.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor MongoDB Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert is the default write mode of the sink.
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
The BATCH clause controls the batching of writes to MongoDB.
TLS/SSL is supported by setting ?ssl=true in the connect.mongo.connection
option. The MongoDB driver will then attempt to load the truststore and keystore using the JVM system properties.
You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:
javax.net.ssl.trustStore: the path to a trust store containing the certificate of the signing authority
javax.net.ssl.trustStorePassword: the password to access this trust store
javax.net.ssl.keyStore: the path to a key store containing the client’s SSL certificates
javax.net.ssl.keyStorePassword: the password to access this key store
All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism
in the connect.mongo.connection
option.
The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism
option.
If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism
has precedence.
e.g.
List of fields that should be converted to ISO Date on MongoDB insertion (comma-separated field names), for JSON topics only. Field values may be an epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If the string does not parse to ISO, it will be written as a string instead.
Subdocument fields can be referred to in the following examples:
topLevelFieldName
topLevelSubDocument.FieldName
topLevelParent.subDocument.subDocument2.FieldName
If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
This is controlled via the connect.mongo.json_datetime_fields
option.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor InfluxDB Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.
Only applicable to value fields. No support for nested fields, keys or topic metadata.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor JMS Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
The sink can write to either topics or queues, specified by the WITHTYPE clause.
When a message is sent to a JMS target it can be one of the following:
JSON - Send a TextMessage
AVRO - Send a BytesMessage
MAP - Send a MapMessage
OBJECT - Send an ObjectMessage
This is set by the WITHFORMAT keyword.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes install the Lenses Secret Providers for Kafka Connect.
Add the plugin to the worker classloader
isolation via the plugin.path
option:
For Azure do not use the classloader isolation (plugin.path
) that Kafka Connect provides. The Azure SDK uses the default classloader and will not find the HTTP client.
To allow for secret rotation add config.action.reload
to your Connect workers properties files.
This property accepts one of two options:
none - No action happens at a connector failure (e.g. it can no longer connect to an external system)
restart - The work will schedule a restart of the connector
Secrets will only be reloaded if the Connector restarts.
This page describes how to retrieve secrets from Azure KeyVault for use in Kafka Connect.
Secure secrets in Azure KeyVault and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
Two authentication methods are supported:
credentials. When using this configuration the client-id, tenant-id and secret-id for an Azure service principal that has access to key vaults must be provided
default. This method uses the default credential provider chain from Azure. The default credential first checks environment variables for configuration. If the environment configuration is incomplete, it will try to use managed identities.
Example worker properties file:
To use this provider in a connector, reference the keyvault containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path is the URL of the Azure KeyVault. DO NOT provide the https:// protocol for the in the keyvault name as the Connect worker will not parse it correctly
key is the name of the secret key in the Azure KeyVault
For example, if we store two secrets:
my_username with the value lenses and
my_password with the value my-secret-password
in a Keyvault called my-azure-key-vault we would set:
This would resolve at runtime to:
The provider handles the following types:
utf_8
base64
The provider will look for a tag attached to the secret called file-encoding. The value for this tag can be:
UTF8
UTF_FILE
BASE64
BASE64_FILE
The UTF8
means the value returned is the string retrieved for the secret key. The BASE64
means the value returned is the base64 decoded string retrieved for the secret key.
If the value for the tag is UTF8_FILE
the string contents as are written to a file. The returned value from the connector configuration key will be the location of the file. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties
file.
If the value for the tag is BASE64_FILE
the string contents are based64 decoded and are written to a file. The returned value from the connector configuration key will be the location of the file. For example, if a connector needs a PEM file on disk, set the prefix as BASE64_FILE
. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties
file.
If no tag is found the contents of the secret string are returned.
This page describes an overview of the Lenses SMTs for Kafka Connect.
Lenses provides several SMTs designed for use with Stream Reactor connectors, you can also use them with other connectors or your own.
These SMTs are designed to be used with the framework. The SMTs create record headers. The advantage of using headers is that they reduce the memory and CPU cycles required to change the payload. See for example the Kafka Connect . Furthermore, they support sink partitioners, for scenarios like:
Partitioning by the system clock (e.g. using the system clock as a partition key with a yyyy-MM-dd-HH format)
Partitioning by a rolling window (e.g. every 15 minutes, or one hour)
Partitioning by a custom timestamp (e.g. a timestamp field in the payload, record Key or Value)
Partitioning by a custom timestamp with a rolling window (e.g. a timestamp field in the payload, every 15 minutes, or one hour)
Add the plugin to the worker classloader
isolation via the plugin.path
option:
For MSK connect you need to bundle your SMT with the connector you need to use and deploy as one plugin.
This zip (containing both jars at the same level) must be uploaded as a plugin in MSK connect.
Specifies if the Connector can send messages in batch, see
The URL is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.
Each header key and value is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.
Enabling SSL connections between Kafka Connect and HTTP Endpoint ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity. Please check out section in order to set it up.
These settings configure the Kafka producer for success and error reports. Full configuration options are available in the and sections. Three examples follow:
The error reporter can also be configured with SSL Properties. See the section . In this case all properties should be prefixed with connect.reporting.error.config
to ensure they apply to the error reporter.
The error reporter can also be configured with SSL Properties. See the section . In this case all properties should be prefixed with connect.reporting.success.config
to ensure they apply to the success reporter.
The connector supports .
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation .
The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see section). The settings are optional and if not specified the defaults are:
For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers . This approach ensures robust security practices while handling access credentials.
The connector supports .
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation .
The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see section). The settings are optional and if not specified the defaults are:
The connector supports .
The connector supports .
The connector supports .
The connector supports .
The connector supports .
To do so, download your connector to get the jar (unzip if needed), download (unzip it). Put both jar in the same folder, and zip them altogether.
Header
{{header.correlation-id}}
Value
{{value}}
{{value.product.id}}
Key
{{key}}
{{key.customer.number}}
Topic
{{topic}}
Partition
{{partition}}
Offset
{{offset}}
Timestamp
{{timestamp}}
batchCount
50_000 records
batchSize
500000000 (500MB)
timeInterval
3_600 seconds (1 hour)
connect.http.method
HttpMethod
Yes
POST, PUT, PATCH
connect.http.endpoint
String
Yes
connect.http.request.content
String
Yes
connect.http.authentication.type
Authentication
No
Authentication Options (none)
connect.http.request.headers
List[String]
No
connect.http.batch.count
Int
No
The number of records to batch before sending the request, see Batch Configuration
connect.http.batch.size
Int
No
The size of the batch in bytes before sending the request, see Batch Configuration
connect.http.time.interval
Int
No
The time interval in milliseconds to wait before sending the request
connect.http.upload.sync.period
Int
No
Upload Sync Period (100) - polling time period for uploads in milliseconds
connect.http.error.threshold
Int
No
The number of errors to tolerate before failing the sink (5)
connect.http.retries.on.status.codes
List[String]
No
The status codes to retry on (408,429,500,502,5003,504)
connect.http.retries.max.retries
Int
No
The maximum number of retries to attempt (5)
connect.http.retries.max.timeout.ms
Int
No
The maximum time in milliseconds to retry a request. Backoff is used to increase the time between retries, up to this maximum (30000)
connect.http.connection.timeout.ms
Int
No
The HTTP connection timeout in milliseconds (10000)
int
No
For each processed topic, the connector maintains an internal queue. This value specifies the maximum number of entries allowed in the queue before the enqueue operation blocks. The default is 100,000.
int
No
The maximum time window, specified in milliseconds, to wait for the internal queue to accept new records. The default is 2 minutes
Property Name
Description
ssl.truststore.location
Path to the truststore file containing the trusted CA certificates for verifying broker certificates.
ssl.truststore.password
Password for the truststore file to protect its integrity.
ssl.truststore.type
Type of the truststore (e.g., JKS
, PKCS12
). Default is JKS
.
ssl.keystore.location
Path to the keystore file containing the client’s private key and certificate chain for client authentication.
ssl.keystore.password
Password for the keystore to protect the private key.
ssl.keystore.type
Type of the keystore (e.g., JKS
, PKCS12
). Default is JKS
.
ssl.protocol
The SSL protocol used for secure connections (e.g., TLSv1.2
, TLSv1.3
). Default is TLSv1.3
.
ssl.keymanager.algorithm
Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
ssl.trustmanager.algorithm
Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
Property Name
Description
connect.reporting.error.config.enabled
Specifies whether the reporter is enabled. false
by default.
connect.reporting.error.config.bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled.
connect.reporting.error.config.topic
Specifies the topic for Reporter to write to.
connect.reporting.error.config.location
SASL Mechanism used when connecting.
connect.reporting.error.config.sasl.jaas.config
JAAS login context parameters for SASL connections in the format used by JAAS configuration files.
connect.reporting.error.config.sasl.mechanism
SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.
Property Name
Description
connect.reporting.success.config.enabled
Specifies whether the reporter is enabled. false
by default.
connect.reporting.success.config.bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled.
connect.reporting.success.config.topic
Specifies the topic for Reporter to write to.
connect.reporting.success.config.location
SASL Mechanism used when connecting.
connect.reporting.success.config.sasl.jaas.config
JAAS login context parameters for SASL connections in the format used by JAAS configuration files.
connect.reporting.success.config.sasl.mechanism
SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.
connect.cassandra.contact.points
Initial contact point host for Cassandra including port.
string
localhost
connect.cassandra.port
Cassandra native port.
int
9042
connect.cassandra.key.space
Keyspace to write to.
string
connect.cassandra.username
Username to connect to Cassandra with.
string
connect.cassandra.password
Password for the username to connect to Cassandra with.
password
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
boolean
false
connect.cassandra.trust.store.path
Path to the client Trust Store.
string
connect.cassandra.trust.store.password
Password for the client Trust Store.
password
connect.cassandra.trust.store.type
Type of the Trust Store, defaults to JKS
string
JKS
connect.cassandra.key.store.type
Type of the Key Store, defauts to JKS
string
JKS
connect.cassandra.ssl.client.cert.auth
Enable client certification authentication by Cassandra. Requires KeyStore options to be set.
boolean
false
connect.cassandra.key.store.path
Path to the client Key Store.
string
connect.cassandra.key.store.password
Password for the client Key Store
password
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.
string
connect.cassandra.fetch.size
The number of records the Cassandra driver will return at once.
int
5000
connect.cassandra.load.balancing.policy
Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN
string
TOKEN_AWARE
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
string
THROW
connect.cassandra.max.retries
The maximum number of times to try the write again.
int
20
connect.cassandra.retry.interval
The time in milliseconds between retries.
int
60000
connect.cassandra.threadpool.size
The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.
int
0
connect.cassandra.delete.struct_flds
Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.
list
[]
connect.cassandra.delete.statement
Delete statement for cassandra
string
connect.cassandra.kcql
KCQL expression describing field selection and routes.
string
connect.cassandra.default.value
By default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved.
string
connect.cassandra.delete.enabled
Enables row deletion from cassandra
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
padding.length.partition
Sets the padding length for the partition.
Int
0
padding.length.offset
Sets the padding length for the offset.
Int
12
partition.include.keys
Specifies whether partition keys are included.
Boolean
false Default (Custom Partitioning): true
store.envelope
Indicates whether to store the entire Kafka message
Boolean
store.envelope.fields.key
Indicates whether to store the envelope’s key.
Boolean
store.envelope.fields.headers
Indicates whether to store the envelope’s headers.
Boolean
store.envelope.fiels.value
Indicates whether to store the envelope’s value.
Boolean
store.envelope.fields.metadata
Indicates whether to store the envelope’s metadata.
Boolean
flush.size
Specifies the size (in bytes) for the flush operation.
Long
500000000 (500MB)
flush.count
Specifies the number of records for the flush operation.
Int
50000
flush.interval
Specifies the interval (in seconds) for the flush operation.
Long
3600 (1 hour)
UNCOMPRESSED
✅
✅
✅
SNAPPY
✅
✅
GZIP
✅
✅
LZ0
✅
LZ4
✅
BROTLI
✅
BZIP2
✅
ZSTD
✅
⚙️
✅
DEFLATE
✅
⚙️
XZ
✅
⚙️
Index Name (connect.gcpstorage.indexes.name
)
Resulting Indexes Prefix Structure
Description
.indexes
(default)
.indexes/<connector_name>/
The default setup, where each connector uses its own nested prefix within .indexes
.
custom-indexes
custom-indexes/<connector_name>/
Custom prefix custom-indexes
, with a nested prefix for each connector.
indexes/gcs-connector-logs
indexes/gcs-connector-logs/<connector_name>/
Uses a custom nested prefix gcs-connector-logs
within indexes
, with a nested prefix for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes
, with a nested prefix for each connector.
connect.gcpstorage.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
"Credentials", "File" or "Default"
"Default"
connect.gcpstorage.gcp.credentials
For "auth.mode" credentials: GCP Authentication credentials string.
string
(Empty)
connect.gcpstorage.gcp.file
For "auth.mode" file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.gcpstorage.gcp.project.id
GCP Project ID.
string
(Empty)
connect.gcpstorage.gcp.quota.project.id
GCP Quota Project ID.
string
(Empty)
connect.gcpstorage.endpoint
Endpoint for GCP Storage.
string
(Empty)
connect.gcpstorage.error.policy
Defines the error handling policy when errors occur during data transfer to or from GCP Storage.
string
"NOOP", "THROW", "RETRY"
"THROW"
connect.gcpstorage.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error.
int
20
connect.gcpstorage.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.gcpstorage.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with GCP.
long
5
connect.gcpstorage.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client.
long
50
connect.gcpstorage.http.retry.timeout.multiplier
Specifies the change in delay before the next retry or poll
double
3.0
connect.gcpstorage.local.tmp.directory
Enables the use of a local folder as a staging area for data transfer operations.
string
(Empty)
connect.gcpstorage.kcql
A SQL-like configuration that defines the behavior of the connector.
string
(Empty)
connect.gcpstorage.compression.codec
Sets the Parquet compression codec to be used when writing data to GCP Storage.
string
"UNCOMPRESSED", "SNAPPY", "GZIP", "LZ0", "LZ4", "BROTLI", "BZIP2", "ZSTD", "DEFLATE", "XZ"
"UNCOMPRESSED"
connect.gcpstorage.compression.level
Sets the compression level when compression is enabled for data transfer to GCP Storage.
int
1-9
(Empty)
connect.gcpstorage.seek.max.files
Specifies the maximum threshold for the number of files the connector uses.
int
5
connect.gcpstorage.indexes.name
Configure the indexes prefix for this connector.
string
".indexes"
connect.gcpstorage.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.gcpstorage.schema.change.rollover
When set to false
, the file will not roll over upon receiving a record with a schema different from the accumulated ones. This is a performance optimization, intended only for cases where schema changes are backward-compatible.
boolean
true,false
true
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
padding.length.partition
Sets the padding length for the partition.
Int
0
padding.length.offset
Sets the padding length for the offset.
Int
12
partition.include.keys
Specifies whether partition keys are included.
Boolean
false Default (Custom Partitioning): true
store.envelope
Indicates whether to store the entire Kafka message
Boolean
store.envelope.fields.key
Indicates whether to store the envelope’s key.
Boolean
store.envelope.fields.headers
Indicates whether to store the envelope’s headers.
Boolean
store.envelope.fields.value
Indicates whether to store the envelope’s value.
Boolean
store.envelope.fields..metadata
Indicates whether to store the envelope’s metadata.
Boolean
flush.size
Specifies the size (in bytes) for the flush operation.
Long
500000000 (500MB)
flush.count
Specifies the number of records for the flush operation.
Int
50000
flush.interval
Specifies the interval (in seconds) for the flush operation.
Long
3600 (1 hour)
UNCOMPRESSED
✅
✅
✅
SNAPPY
✅
✅
GZIP
✅
✅
LZ0
✅
LZ4
✅
BROTLI
✅
BZIP2
✅
ZSTD
✅
⚙️
✅
DEFLATE
✅
⚙️
XZ
✅
⚙️
Index Name (connect.datalake.indexes.name
)
Resulting Indexes Directory Structure
Description
.indexes
(default)
.indexes/<connector_name>/
The default setup, where each connector uses its own subdirectory within .indexes
.
custom-indexes
custom-indexes/<connector_name>/
Custom root directory custom-indexes
, with a subdirectory for each connector.
indexes/datalake-connector-logs
indexes/datalake-connector-logs/<connector_name>/
Uses a custom subdirectory datalake-connector-logs
within indexes
, with a subdirectory for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes
, with a subdirectory for each connector.
connect.datalake.azure.auth.mode
Specifies the Azure authentication mode for connecting to Datalake.
string
“Credentials”, “ConnectionString” or “Default”
“Default”
connect.datalake.azure.account.key
The Azure Account Key used for authentication.
string
(Empty)
connect.datalake.azure.account.name
The Azure Account Name used for authentication.
string
(Empty)
connect.datalake.pool.max.connections
Specifies the maximum number of connections allowed in the Azure Client’s HTTP connection pool when interacting with Datalake.
int
-1 (undefined)
50
connect.datalake.endpoint
Datalake endpoint URL.
string
(Empty)
connect.datalake.error.policy
Defines the error handling policy when errors occur during data transfer to or from Datalake.
string
“NOOP,” “THROW,” “RETRY”
“THROW”
connect.datalake.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.datalake.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.datalake.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with Datalake.
long
5
connect.datalake.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.datalake.local.tmp.directory
Enables the use of a local folder as a staging area for data transfer operations.
string
(Empty)
connect.datalake.kcql
A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.
string
(Empty)
connect.datalake.compression.codec
Sets the Parquet compression codec to be used when writing data to Datalake.
string
“UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”
“UNCOMPRESSED”
connect.datalake.compression.level
Sets the compression level when compression is enabled for data transfer to Datalake.
int
1-9
(Empty)
connect.datalake.seek.max.files
Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.
int
5
connect.datalake.indexes.name
Configure the indexes root directory for this connector.
string
".indexes"
connect.datalake.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.datalake.schema.change.rollover
When set to false
, the file will not roll over upon receiving a record with a schema different from the accumulated ones. This is a performance optimization, intended only for cases where schema changes are backward-compatible.
boolean
true,false
true
connect.mqtt.hosts
Contains the MQTT connection end points.
string
connect.mqtt.username
Contains the Mqtt connection user name
string
connect.mqtt.password
Contains the Mqtt connection password
password
connect.mqtt.service.quality
Specifies the Mqtt quality of service
int
connect.mqtt.timeout
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
list
ssl.enabled.protocols
The list of protocols enabled for SSL connections.
list
[TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.password
The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
password
ssl.key.password
The password of the private key in the key store file. This is optional for client.
password
ssl.keystore.type
The file format of the key store file. This is optional for client.
string
JKS
ssl.truststore.location
The location of the trust store file.
string
ssl.endpoint.identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate.
string
https
ssl.protocol
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
string
TLS
ssl.trustmanager.algorithm
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
string
PKIX
ssl.secure.random.implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations.
string
ssl.truststore.type
The file format of the trust store file.
string
JKS
ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
string
SunX509
ssl.provider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
string
ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
string
ssl.truststore.password
The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
password
connect.mongo.connection
The mongodb connection in the format mongodb://[username:password@]host1[:port1],host2[:port2],…[,hostN[:portN]]][/[database][?options]].
string
connect.mongo.db
The mongodb target database.
string
connect.mongo.username
The username to use when authenticating
string
connect.mongo.password
The password for the use when authenticating
password
connect.mongo.auth.mechanism
String
string
SCRAM-SHA-1
connect.mongo.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mongo.max.retries
The maximum number of times to try the write again.
int
20
connect.mongo.retry.interval
The time in milliseconds between retries.
int
60000
connect.mongo.kcql
KCQL expression describing field selection and data routing to the target mongo db.
string
connect.mongo.json_datetime_fields
List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead. Subdocument fields can be referred to as in the following examples: “topLevelFieldName”, “topLevelSubDocument.FieldName”, “topLevelParent.subDocument.subDocument2.FieldName”, (etc.) If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
list
[]
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.influx.url
The InfluxDB database url.
string
connect.influx.db
The database to store the values to.
string
connect.influx.username
The user to connect to the influx database
string
connect.influx.password
The password for the influxdb user.
password
connect.influx.kcql
KCQL expression describing field selection and target measurements.
string
connect.progress.enabled
Enables the output for how many records have been processed by the connector
boolean
false
connect.influx.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.influx.retry.interval
The time in milliseconds between retries.
int
60000
connect.influx.max.retries
The maximum number of times to try the write again.
int
20
connect.influx.retention.policy
Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen
from 1.0 onwards or default
for any previous version
string
autogen
connect.influx.consistency.level
Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.
string
ALL
connect.jms.url
Provides the JMS broker url
string
connect.jms.initial.context.factory
Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
string
connect.jms.connection.factory
Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory
string
ConnectionFactory
connect.jms.kcql
connect.jms.kcql
string
connect.jms.subscription.name
subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics
string
connect.jms.password
Provides the password for the JMS connection
password
connect.jms.username
Provides the user for the JMS connection
string
connect.jms.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.jms.retry.interval
The time in milliseconds between retries.
int
60000
connect.jms.max.retries
The maximum number of times to try the write again.
int
20
connect.jms.destination.selector
Selector to use for destination lookup. Either CDI or JNDI.
string
CDI
connect.jms.initial.context.extra.params
List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp
list
[]
connect.jms.batch.size
The number of records to poll for on the target JMS destination in each Connect poll.
int
100
connect.jms.polling.timeout
Provides the timeout to poll incoming messages
long
1000
connect.jms.source.default.converter
Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter
string
connect.jms.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.headers
Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.jms.evict.interval.minutes
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.
int
10
connect.jms.evict.threshold.minutes
The number of minutes after which an uncommitted entry becomes evictable from the connector cache.
int
10
connect.jms.scale.type
How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max
file.dir
The base location for any files to be stored
file.write
Writes secrets to file on path. Required for Java trust stores, key stores, certs that need to be loaded from file. For ease of use for the secret provider, this is disabled by default.
vault.auth.method
Available values are approle
, userpass
, kubernetes
, cert
, token
, ldap
, gcp
, awsiam
, jwt
, github
, token
vault.addr
Address of the Vault server
vault.token
Use when ‘vault.auth.method’ is ‘token’ to specify the token value
vault.namespace
Set a global namespace to the Vault server instance. Requires Vault Enterprise Pro
vault.pem
File containing the Vault Server certificate content as string
vault.client.pem
File containing the Client certificate string content as string
vault.engine.version
KV Secrets Engine version of the Vault server instance. Default is 2
vault.ssl.truststore.location
The location of the trust store file
vault.ssl.keystore.location
The location of the key store file
vault.ssl.keystore.password
The password for the key store file
secret.default.ttl
If no TTL is configured in Vault, apply a default TTL.
app.role.id
Use when vault.auth.method is approle
or kubernetes
to specify the Vault App role id
app.role.secret.id
Use when vault.auth.method is approle
tp specify the Vault App role name secret id
app.role.path
Use when vault.auth.method is approle
to specify the Vault App role path
username
Use when vault.auth.method is userpass
to specify the username to connect to Vault
password
Use when vault.auth.method is userpass
to specify the password to connect to Vault
mount
Use when vault.auth.method is userpass
to specify the mount name of the userpass authentication back end
ldap.username
Use when vault.auth.method is ldap
to specify the LDAP username to connect to Vault with
ldap.password
Use when vault.auth.method is ldap
to specify the LDAP password to connect to Vault with
mount
Use when vault.auth.method is ldap
to specify the mount name of the ldap authentication back end
jwt.role
Use when vault.auth.method is jwt
to specify the role the JWT token belongs to
jwt.provider
Use when vault.auth.method is jwt
to specify the provider of the JWT token
jwt
Use when vault.auth.method is jwt
to specify the JWT token
gcp.role
Use when vault.auth.method is gcp
to specify the gcp role used for authentication
gcp.jwt
Use when vault.auth.method is gcp
to specify the JWT token
cert.mount
Use when vault.auth.method is cert
to specify the mount name of the cert authentication back end
github.token
Use when vault.auth.method is github
to specify the github app-id to use for authentication
github.mount
Use when vault.auth.method is github
to specify the mount name of the github authentication back end
kubernetes.role
Use when vault.auth.method is kubernetes
to specify the kubernetes role for authentication
kubernetes.token.path
Use when vault.auth.method is kubernetes
to specify the path to the service account token . Default is /var/run/secrets/kubernetes.io/serviceaccount/token
kubernetes.auth.path
Use when vault.auth.method is kubernetes
to specify a custom mount path
aws.role
Use when vault.auth.method is awsiam
.
Name of the role to login. If role is not specified, the login endpoint uses the role bearing the name of the AMI ID of the EC2 instance or if using the ec2 auth method the friendly name (i.e., role name or username) of the IAM authenticated principal
aws.request.url
Use when vault.auth.method is awsiam
.
PKCS7 signature of the identity document with all n characters removed. Base64-encoded HTTP URL used in the signed request (i.e. base64-encoding of https://sts.amazonaws.com
) as most requests will probably use POST with an empty URI
aws.request.body
Use when vault.auth.method is awsiam
.
Base64-encoded body of the signed request i.e. base64 of Action=GetCallerIdentity&Version=2011-06-15
aws.request.headers
Use when vault.auth.method is awsiam
to specify any request headers
aws.mount
Use when vault.auth.method is awsiam
. The AWS auth mount. Default is “aws”
azure.auth.method
Azure authenticate method. ‘credentials’ to use the provided credentials or ‘default’ for the standard Azure provider chain
credentials
azure.client.id
Azure client id for the service principal. Valid is auth.method is ‘credentials’
azure.tenant.id
Azure tenant id for the service principal. Valid is auth.method is ‘credentials’
azure.secret.id
Azure secret id for the service principal. Valid is auth.method is ‘credentials’
file.dir
The base location for any files to stored
Decodes values encoded with AES-256 to enable passing encrypted values to connectors.
Secrets will only be reloaded if the Connector restarts.
Add the plugin to the worker classloader isolation via the plugin.path option:
The provider gets AES-256 encrypted value as a key and simply decrypts it to get the value (instead of e.g. looking up for the value somewhere).
The AES-256 encryption used for the value needs to be prefixed with base64 encoded initialisation vector and a space character, the encrypted value is also base64 encoded. So to corretly encrypt value1
I need to follow following steps:
encrypted-bytes
= aes-256 encrypted value1
encrypted-base64
= base64 encrypted-bytes
initialisation-vector
= random bytes
iv-base64
= base64 initialisation-vector
encrypted-value
= iv-base64
+ + encrypted-base64
The plugin needs to be configured with secret key that will be used for decoding. The key is a string and needs to have size of 32 bytes (UTF-8 encoded).
aes256.key
Secret key used for encrypting and decrypting the value. String of 32 bytes.
Example worker properties file:
To use this provider in a connector, reference the keyvault containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path used to provide encoding of the value: utf8, utf8_file, base64, base64_file
key is the AES-256 encrypted value to be decrypted by the plugin
For example, if hello
aes-256 encrypted using some key equals to xyxyxy
- then if I configure connector to use ${aes256::xyxyxy}
for a parameter value, the value should be substituted with “hello” string:
This would resolve at runtime to:
path
belonging to key reference is used to specify encoding used to pass the value. The provider supports following encodings:
base64: base-64 encoding of the textual value
base64_file: base-64 encoding of the value that when decrypted should be stored in the file
utf8_file: utf-8 encoding of the value that when decrypted should be stored in the file
utf8: utf-8 encoding of textual value
The UTF8 means the value returned is the decrypted value of the encrypted value (key). The BASE64 means the value returned is the base64 decoded decrypted value of the encrypted value (key).
If the value for the encoding is UTF8_FILE the string contents are written to a file. The name of the file will be randomply generated. The file location is determined by the file.dir configuration option given to the provider via the Connect worker.properties file.
If the value for the encoding is BASE64_FILE the string contents are based64 decoded and written to a file. The name of the file will be randomply generated. For example, if a connector needs a PEM file on disk, set this as the path as BASE64_FILE. The file location is determined by the file.dir configuration option given to the provider via the Connect worker.properties file.
If the key reference path is not set or is set to unknown value - utf8 encoding is used as default.
For example, if we want to save hi there !
to the file, and aes-256 encrypted content equals xyxyxy
- then if I configure connector to use ${aes256:utf8_file:xyxyxy}
for a parameter value, the provider will create new file with random name (abc-def-ghi
) and store hi there !
to the file. If configured store directory is /store-root
, he value will be substituted with /store-root/secrets/abc-def-ghi
string:
resolves to
Inserts the datetime as a message header from a value field.
This Kafka Connect Single Message Transform (SMT) facilitates the insertion of date and time components (year, month, day, hour, minute, second) as headers into Kafka messages using a timestamp field within the message payload. The timestamp field can be in various valid formats, including long integers, strings, or date objects. The timestamp field can originate from either the record Key or the record Value. When extracting from the record Key, prefix the field with _key.
; otherwise, extract from the record Value by default or explicitly using the field without prefixing. For string-formatted fields, specify a format.from.pattern
parameter to define the parsing pattern. Long integer fields are assumed to be Unix timestamps; the desired Unix precision can be specified using the unix.precision
parameter.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH
or yyyy/MM/dd/HH
, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_
, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _
, here are a few KCQL examples:
field
The field name. If the key is part of the record Key, prefix with _key
; otherwise _value
. If _value
or _key
is not used, it defaults to the record Value to resolve the field.
String
format.from.pattern
Optional DateTimeFormatter-compatible format for the timestamp. Used to parse the input if the input is a string. Multiple (fallback) patterns can be added, comma-separated.
String
unix.precision
Optional. The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. Used to parse the input if the input is a Long.
String
milliseconds
header.prefix.name
Optional header prefix.
String
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
year.format
Optional Java date time formatter for the year component.
String
yyyy
month.format
Optional Java date time formatter for the month component.
String
MM
day.format
Optional Java date time formatter for the day component.
String
dd
hour.format
Optional Java date time formatter for the hour component.
String
HH
minute.format
Optional Java date time formatter for the minute component.
String
mm
second.format
Optional Java date time formatter for the second component.
String
ss
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
To use the record Value field named created_at
as the unix timestamp, use the following:
To use the record Key field named created_at
as the unix timestamp, use the following:
To prefix the headers with wallclock_
, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata
, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH
, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
format.from.pattern
Configuring multiple format.from.pattern
items requires careful thought as to ordering and may indicate that your Kafka topics or data processing techniques are not aligning with best practices. Ideally, each topic should have a single, consistent message format to ensure data integrity and simplify processing.
The format.from.pattern
field supports multiple DateTimeFormatter patterns in a comma-separated list to handle various timestamp formats. Patterns containing commas should be enclosed in double quotes. For example:
While this flexibility can be useful, it is generally not recommended due to potential complexity and inconsistency. Ideally, a topic should have a single message format to align with Kafka best practices, ensuring consistency and simplifying data processing.
The order of patterns in format.from.pattern
matters. Less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd
after yyyy-MM-dd'T'HH:mm:ss
to ensure detailed timestamp information is preserved.
This page describes how to retrieve secrets from AWS Secret Manager for use in Kafka Connect.
Secure secrets in AWS Secret Manager and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
Two authentication methods are supported:
credentails. When using this configuration the access-key and secret-key are used.
default. This method uses the default credential provider chain from AWS. The default credential first checks environment variables for configuration. If the environment configuration is incomplete, Java props, then the profile file and finally it will try managed identity.
aws.auth.method
AWS authenticate method. ‘credentials’ to use the provided credentials or ‘default’ for the standard AWS provider chain
credentials
aws.access.key
AWS client key. Valid is auth.method is ‘credentials’
aws.secret.key
AWS secret key. Valid is auth.method is ‘credentials’
aws.region
AWS region the for the Secrets manager
file.dir
The base location for any files to stored
file.write
Writes secrets to file on path. Required for Java trust stores, key stores, certs that need to be loaded from file. For ease of use for the secret provider, this is disabled by default.
false
secret.default.ttl
If no TTL is configured in AWS Secrets Manager, apply a default TTL (in milliseconds).
(not enabled)
aws.endpoint.override
Specify the secret provider endpoint.
(not enabled)
secret.type
Specify the type of secrets stored in Secret Manager. Defaults to JSON, to enable String secret values set this property as STRING.
JSON
Example Worker Properties
To use this provider in a connector, reference the SecretManager containing the secret and the key name for the value of the connector property.
The indirect reference is in the form ${provider:path:key} where:
provider is the name of the provider in the worker property file set above
path is the name of the secret
key is the name of the secret key in secret to retrieve. AWS can store multiple keys under a path.
For example, if we store two secrets as keys:
my_username_key with the value lenses and
my_password_key with the value my-secret-password
in a secret called my-aws-secret we would set:
This would resolve at runtime to:
AWS SecretManager BinaryString (API only), is not supported. The secrets must be stored under the secret name in key, value pair format. The provider checks the SecretString API and expects a JSON string to be returned.
For example for an RDS Postgre secret, the following is returned by AWS Secret Manager:
The provider handles the following types:
utf_8
base64
The provider will look for keys prefixed with:
UTF8
UTF_FILE
BASE64
BASE64_FILE
The UTF8
means the value returned is the string retrieved for the secret key. The BASE64
means the value returned is the base64 decoded string retrieved for the secret key.
If the value for the tag is UTF8_FILE the string contents are written to a file. The returned value from the connector configuration key will be the location of the file. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties
file.
If the value for the tag is BASE64_FILE
the string contents are based64 decoded and written to a file. The returned value from the connector configuration key will be the location of the file. For example, if a connector needs a PEM file on disk, set the prefix as BASE64_FILE
. The file location is determined by the file.dir configuration option is given to the provider via the Connect worker.properties
file.
If no prefix is found the contents of the secret string are returned.
Kafka SMT that inserts date, year, month, day, hour, minute and second headers using the system timestamp and a rolling time window configuration.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH
or yyyy/MM/dd/HH
, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_
, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _
, here are a few KCQL examples:
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
year.format
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type
it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_
, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata
, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH
, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
This page describes how to retrieve secrets from Environment variables for use in Kafka Connect.
Use Environment variables to hold secrets and use them in Kafka Connect.
Secrets will only be reloaded if the Connector restarts.
Example Worker Properties:
To use this provider in a connector, reference the ENVSecretProvider environment variable providing the value of the connector property.
The indirect reference is in the form ${provider::key} where:
provider is the name of the provider in the worker property file set above
key is the name of the environment variable holding the secret.
For example, if we store two secrets as environment variables:
MY_ENV_VAR_USERNAME with the value lenses and
MY_ENV_VAR_PASSWORD with the value my-secret-password
we would set:
This would resolve at runtime to:
This provider inspects the value of the environment to determine how to process the value. The value can optionally provide value metadata to support base64 decoding and writing values to files.
To provide metadata the following patterns are expected:
where value is the actual payload and metadata can be one of the following:
ENV-base64 - the provider will attempt to base64 decode the value string
ENV-mounted-base64 - the provider will attempt to base64 decode the value string and write to a file
ENV-mounted - the provider will write the value to a file
if no metadata is found the value of the environment variable is returned.
This page describes how to used SMTs your Kafka Connect Clusters.
Inserts date, year, month, day, hour, minute and second headers using the record timestamp and a rolling time window configuration. If the record timestamp is null, the SMT uses the system time.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH
or yyyy/MM/dd/HH
, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_
, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _
, here are a few KCQL examples:
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
year.format
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type
it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_
, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata
, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH
, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
SMT that inserts the system clock value as a message header, a value adapted to a specified time window boundary, for example every 15 minutes, or one hour.
The value inserted is stored as a STRING, and it holds either a string representation of the date and time epoch value, or a string representation of the date and time in the format specified.
header.name
The name of the header to insert the timestamp into.
String
High
value.type
Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required."
String
format
epoch,format
High
format
Sets the format of the header value inserted if the type was set to string. It can be any valid java date format.
String
High
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
hours, minutes, seconds
High
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type
it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
High
timezone
Sets the timezone. It can be any valid java timezone. Overwrite it when value.type
is set to format
, otherwise it will raise an exception.
String
UTC
High
To store the epoch value, use the following configuration:
To store a string representation of the date and time in the format yyyy-MM-dd HH:mm:ss.SSS
, use the following:
To use the timezone Asia/Kolkoata
, use the following:
Inserts the datetime as a message header from a value field and a rolling window configuration.
A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using a timestamp field from the record payload and a rolling time window configuration. The timestamp field can be in various valid formats, including long integers, strings, or date objects. The timestamp field can originate from either the record Key or the record Value. When extracting from the record Key, prefix the field with _key.
; otherwise, extract from the record Value by default or explicitly using the field without prefixing. For string-formatted fields, specify a format.from.pattern
parameter to define the parsing pattern. Long integer fields are assumed to be Unix timestamps; the desired Unix precision can be specified using the unix.precision
parameter.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH
or yyyy/MM/dd/HH
, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_
, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _
, here are a few KCQL examples:
field
The field name. If the key is part of the record Key, prefix with _key
; otherwise _value
. If _value
or _key
is not used, it defaults to the record Value to resolve the field.
String
format.from.pattern
Optional DateTimeFormatter-compatible format for the timestamp. Used to parse the input if the input is a string. Multiple (fallback) patterns can be added, comma-separated.
String
unix.precision
Optional. The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. Used to parse the input if the input is a Long.
String
milliseconds
header.prefix.name
Optional header prefix.
String
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
year.format
Optional Java date time formatter for the year component.
String
yyyy
month.format
Optional Java date time formatter for the month component.
String
MM
day.format
Optional Java date time formatter for the day component.
String
dd
hour.format
Optional Java date time formatter for the hour component.
String
HH
minute.format
Optional Java date time formatter for the minute component.
String
mm
second.format
Optional Java date time formatter for the second component.
String
ss
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
rolling.window.type
Sets the window type. It can be fixed or rolling.
String
minutes
rolling.window.size
Sets the window size. It can be any positive integer, and depending on the window.type
it has an upper bound, 60 for seconds and minutes, and 24 for hours.
Int
15
## Example
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_
, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata
, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH
, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
format.from.pattern
Configuring multiple format.from.pattern
items requires careful thought as to ordering and may indicate that your Kafka topics or data processing techniques are not aligning with best practices. Ideally, each topic should have a single, consistent message format to ensure data integrity and simplify processing.
The format.from.pattern
field supports multiple DateTimeFormatter patterns in a comma-separated list to handle various timestamp formats. Patterns containing commas should be enclosed in double quotes. For example:
While this flexibility can be useful, it is generally not recommended due to potential complexity and inconsistency. Ideally, a topic should have a single message format to align with Kafka best practices, ensuring consistency and simplifying data processing.
The order of patterns in format.from.pattern
matters. Less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd
after yyyy-MM-dd'T'HH:mm:ss
to ensure detailed timestamp information is preserved.
Inserts the system clock as a message header.
Use InsertWallclockHeaders SMT if you want to use more than one date time part. This avoids multiple SMTs and is more efficient.
For example, if you want to partition the data by yyyy-MM-dd/HH
, then you can use InsertWallclockHeaders
which inserts multiple headers: date, year, month, day, hour, minute, second.
Inserts the system clock as a message header, with a value of type STRING. The value can be either a string representation of the date and time epoch value, or a string representation of the date and time in the format specified for example yyyy-MM-dd HH:mm:ss.SSS
.
header.name
The name of the header to insert the timestamp into.
String
High
value.type
Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required."
String
format
epoch,format
High
format
Sets the format of the header value inserted if the type was set to string. It can be any valid java date format.
String
High
timezone
Sets the timezone. It can be any valid java timezone. Overwrite it when value.type
is set to format
, otherwise it will raise an exception.
String
UTC
High
To store the epoch value, use the following configuration:
To store a string representation of the date and time in the format yyyy-MM-dd HH:mm:ss.SSS
, use the following:
To use the timezone Asia/Kolkoata
, use the following:
SMT that allows the user to specify the format of the timestamp inserted as a header. It also avoids the synchronization block requirement for converting to a string representation of the timestamp.
An adapted version of the TimestampConverter SMT. The SMT adds a few more features to the original:
allows nested fields resolution (e.g. a.b.c
)
uses _key or _value as prefix to understand the field to convert is part of the record Key or Value
allows conversion from one string representation to another (e.g. yyyy-MM-dd HH:mm:ss
to yyyy-MM-dd
)
allows conversion using a rolling window boundary (e.g. every 15 minutes, or one hour)
header.name
The name of the header to insert the timestamp into.
String
field
The field path containing the timestamp, or empty if the entire value is a timestamp. Prefix the path with the literal string _key
, _value
or _timestamp
, to specify the record Key, Value or Timestamp is used as source. If not specified _value
is implied.
String
target.type
Sets the desired timestamp representation.
String
string,unix,date,time,timestamp
format.from.pattern
Sets the format of the timestamp when the input is a string. The format requires a Java DateTimeFormatter-compatible pattern. Multiple (fallback) patterns can be added, comma-separated.
String
format.to.pattern
Sets the format of the timestamp when the output is a string. The format requires a Java DateTimeFormatter-compatible pattern.
String
rolling.window.type
An optional parameter for the rolling time window type. When set it will adjust the output value according to the time window boundary.
String
none
none, hours, minutes, seconds
rolling.window.size
An optional positive integer parameter for the rolling time window size. When rolling.window.type
is defined this setting is required. The value is bound by the rolling.window.type
configuration. If type is minutes
or seconds
then the value cannot bigger than 60, and if the type is hours
then the max value is 24.
Int
15
unix.precision
The desired Unix precision for the timestamp. Used to generate the output when type=unix or used to parse the input if the input is a Long. This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components.
String
milliseconds
seconds, milliseconds, microseconds, nanoseconds
timezone
Sets the timezone. It can be any valid java timezone. Overwrite it when target.type
is set to date, time, or string
, otherwise it will raise an exception.
String
UTC
To convert to and from a string representation of the date and time in the format yyyy-MM-dd HH:mm:ss.SSS
, use the following configuration:
To convert to and from a string representation while applying an hourly rolling window:
To convert to and from a string representation while applying an hourly rolling window and timezone:
To convert to and from a string representation while applying a 15 minutes rolling window:
To convert to and from a Unix timestamp, use the following:
Here is an example using the record timestamp field:
format.from.pattern
Configuring multiple format.from.pattern
items requires careful thought as to ordering and may indicate that your Kafka topics or data processing techniques are not aligning with best practices. Ideally, each topic should have a single, consistent message format to ensure data integrity and simplify processing.
The format.from.pattern
field supports multiple DateTimeFormatter patterns in a comma-separated list to handle various timestamp formats. Patterns containing commas should be enclosed in double quotes. For example:
While this flexibility can be useful, it is generally not recommended due to potential complexity and inconsistency. Ideally, a topic should have a single message format to align with Kafka best practices, ensuring consistency and simplifying data processing.
The order of patterns in format.from.pattern
matters. Less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd
after yyyy-MM-dd'T'HH:mm:ss
to ensure detailed timestamp information is preserved.
This page contains tutorials for common Kafka Connect use cases.
A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using the system clock as a message header.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH
or yyyy/MM/dd/HH
, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_
, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _
, here are a few KCQL examples:
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
year.format
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_
, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata
, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH
, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a message header, with a value of type STRING.
Use InsertWallclockHeaders SMT if you want to use more than one date time part. This avoids multiple SMTs and is more efficient.
For example, if you want to partition the data by yyyy-MM-dd/HH
, then you can use InsertWallclockHeaders
which inserts multiple headers: date, year, month, day, hour, minute, second.
header.name
The name of the header to insert the timestamp into.
String
High
date.time.part
The date time part to insert.
String
year, month, day, hour,minute, second
High
timezone
Sets the timezone. It can be any valid java timezone.
String
UTC
High
To store the year, use the following configuration:
To store the month, use the following configuration:
To store the day, use the following configuration:
To store the hour, use the following configuration:
To store the hour, and apply a timezone, use the following configuration:
To store the minute, use the following configuration:
To store the second, use the following configuration:
The InsertSourcePartitionOrOffsetValue transformation in Kafka Connect allows you to insert headers into SourceRecords based on partition or offset values.
The InsertSourcePartitionOrOffsetValue
transformation in Kafka Connect allows you to insert headers into SourceRecords based on partition or offset values. This is useful for adding metadata to your data records before they are sent to destinations like AWS S3, Azure Datalake, or GCP Storage.
This SMT only works with source connectors.
offset.fields
Comma-separated list of fields to retrieve from the offset
Optional
Empty list
offset.prefix
Optional prefix for offset keys
Optional
"offset."
partition.fields
Comma-separated list of fields to retrieve from the partition
Required
Empty list
partition.prefix
Optional prefix for partition keys
Optional
"partition."
Default Value: Specifies the default value assigned if no value is explicitly provided in the configuration.
These properties allow you to customize which fields from the offset and partition of a SourceRecord are added as headers, along with specifying optional prefixes for the header keys. Adjust these configurations based on your specific use case and data requirements.
transforms
: This property lists the transformations to be applied to the records.
transforms.InsertSourcePartitionOrOffsetValue.type
: Specifies the class implementing the transformation (InsertSourcePartitionOrOffsetValue
in this case).
transforms.InsertSourcePartitionOrOffsetValue.offset.fields
: Defines the fields from the offset to be inserted as headers in the SourceRecord. Replace path,line,ts
with the actual field names you want to extract from the offset.
transforms.InsertSourcePartitionOrOffsetValue.partition.fields
: Defines the fields from the partition to be inserted as headers in the SourceRecord. Replace container,prefix
with the actual field names you want to extract from the partition.
When using this transformation with AWS S3, you can configure your Kafka Connect connector as follows:
To customise the header prefix you can also set the header values:
Replace path,line,ts
and container,prefix
with the actual field names you are interested in extracting from the partition or offset.
By using InsertSourcePartitionOrOffsetValue
transformation, you can enrich your data records with additional metadata headers based on partition or offset values before they are delivered to your cloud storage destinations.
The prefix feature in InsertSourcePartitionOrOffsetValue
allows you to prepend a consistent identifier to each header key added based on partition or offset values from SourceRecords.
Configure the transformation in your Kafka Connect connector properties:
offset.prefix
: Specifies the prefix for headers derived from offset values. Default is "offset."
.
partition.prefix
: Specifies the prefix for headers derived from partition values. Default is "partition."
.
By setting offset.prefix=offset.
and partition.prefix=partition.
, headers added based on offset and partition fields will have keys prefixed accordingly in the SourceRecord headers.
This configuration ensures clarity and organization when inserting metadata headers into your Kafka records, distinguishing them based on their source (offset or partition). Adjust prefixes (offset.prefix
and partition.prefix
) as per your naming conventions or requirements.
Examples for GCP Source Kafka Connector.
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: gcp-storageSourceConnectorParquet (This can be customized as needed)
Connector Class: io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME
: Name of the GCP Storage storage bucket.
$PREFIX_NAME
: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
This configuration example is particularly useful when you need to restore data from a GCP Storage, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
Data Restoration with Envelope Structure: If you’re restoring data from GCP Storage into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000
) and a limit on the number of records (LIMIT 10000
). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
Error Handling: Error handling is configured to throw exceptions (connect.gcpstorage.error.policy=THROW
) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.gcpstorage.source.partition.extractor.type=hierarchical
), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true
), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
Similar to the above, this is another configuration for envelope format
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4
), this variant is configured for single-task processing (tasks.max=1
). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'
), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter
). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081
), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.gcpstorage.source.partition.extractor.type=hierarchical
), enabling extraction of partitioning structures from the source data location.
Coming soon!
Coming soon!
Coming soon!
Coming soon!
This page describes how to contribute to the Stream Reactor.
The Stream Reactor is an open-source project and we welcome feedback on any issues, even more, we enjoy and encourage contributions, either fixes or new connectors! Don't be shy.
What you can contribute? Everything, even if you aren't a JVM program you can help with docs and tutorials, everything is open.
If you have an issue raise it on the GitHub issue page and supply as much information as you can. If you have a fix, raise a Pull request and ask for a review.
Please make sure to:
Test your changes, we won't accept changes without unit and integration testing
Make sure it builds!
Rebase your issues if they become stale
Update docs!
The Stream Reactor is under an Apache 2.0 license so others can use your hard work for free without restriction.
Coming soon!
This page describes the release notes for the Stream Reactor.
This page contains the release notes for Connect Secret Providers.
Security: Write maven Descriptors on packaging to avoid incorrect dependencies being identified by security scanner tools. (Fixes CVE-2023-1370).
Security: Add dependency checking as part of the build process.
Security: Change AES256 key to PASSWORD type to avoid logging secrets.
New property : file.write
Writes secrets to file on path. Required for Java trust stores, key stores, certs that need to be loaded from file. For ease of use for the secret provider, this is disabled by default.
New property : secret.default.ttl
If no TTL is configured in AWS Secrets Manager, apply a default TTL (in milliseconds).
New property : aws.endpoint.override
Add override for non-standard or compatible AWS endpoints.
Enhancement : Ensuring secrets are cached within their TTL (same as Vault).
Enhancement : Upgraded dependencies to use AWS V2 Client.
Enhancement : Added AWS STS dependency to avoid the requirement of additional libraries for default authentication (eg. EKS).
Security: Don’t expose secret values in exception messages on JsonParseException.
New property : secret.type
Specify the type of secrets stored in Secret Manager. Defaults to JSON, to enable String secret values to change to STRING.
Bugfix: enable accessKey and secretKey to remain blank if using DEFAULT auth mode.
Bugfix: Recompute TTL values on each get so the timestamp of reschedule shrinks until TTL is reached.
Bugfix: Fix so that UTF-8 encodings in Azure are correctly mapped to the UTF8 encoding in the secret provider.
Bugfix: Files will be written to the correct directory.
New property: app.role.path
Support vault approle custom mount path.
New property: kubernetes.auth.path
Support vault custom auth path (with default value to be auth/kubernetes).
Security: vault-java-driver
was no longer maintained, switched to use a community fork io.github.jopenlibs
Add support for the Vault Database credential engine
This page contains the release notes for the Stream Reactor.
This release addresses two critical issues:
Corrupted connector state when DELETE/MOVE is used: The connector is designed to store the last processed document and its location within its state for every message sent to Kafka. This mechanism ensures that the connector can resume processing from the correct point in case of a restart. However, when the connector is configured with a post-operation to move or delete processed objects within the data lake, it stores the last processed object in its state. If the connector restarts and the referenced object has been moved or deleted externally, the state points to a non-existent object, causing the connector to fail. The current workaround requires manually cleaning the state and restarting the connector, which is inefficient and error-prone.
Incorrect Handling of Move Location Prefixes: When configuring the move location within the data lake, if the prefix ends with a forward slash (/), it results in malformed keys like a//b. Such incorrect paths can break compatibility with query engines like Athena, which may not handle double slashes properly.
Performance improvements in the source to handle a higher throughput. The code now leverages prefetch count, and disables the auto complete. The following connector configs were added
connect.servicebus.source.prefetch.count
The number of messages to prefetch from ServiceBus
connect.servicebus.source.complete.retries.max
The maximum number of retries to attempt while completing a message
connect.servicebus.source.complete.retries.min.backoff.ms
The minimum duration in milliseconds for the first backoff
connect.servicebus.source.sleep.on.empty.poll.ms
The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.
A NullPointerException is thrown and the sinks lose the stacktrace. It is a patch to enhance the logging
Addresses a critical bug in ElasticSearch versions 6 (ES6) and 7 (ES7) where records following a tombstone are inadvertently skipped during the insertion process. The issue stemmed from an erroneous return statement that halted the processing of subsequent records.
🚀 New Features
Introduced a new KCQL property: batch.enabled
(default: true
).
Users can now disable batching to send messages sequentially, addressing specific scenarios with large message sizes (e.g., >1 MB).
Why this matters: Batching improves performance but can fail for large messages. Sequential sending ensures reliability in such cases.
How to use: Configure batch.enabled=false
in the KCQL mapping to enable sequential sending.
Added post-processing capabilities for AWS S3 and GCP Storage source connectors ( Azure Datalake Gen 2 support coming soon).
New KCQL properties:
post.process.action
: Defines the action (DELETE
or MOVE
) to perform on source files after successful processing.
post.process.action.bucket
: Specifies the target bucket for the MOVE
action (required for MOVE
).
post.process.action.prefix
: Specifies a new prefix for the file’s location when using the MOVE
action (required for MOVE
).
Use cases:
Free up storage space by deleting files.
Archive or organize processed files by moving them to a new location.
Example 1 : Delete Files:
Example 2: Move files to an archive bucket:
Updated Azure Service Bus Dependencies
azure-core
updated to version 1.54.1.
azure-messaging-servicebus
updated to version 7.17.6.
These updates ensure compatibility with the latest Azure SDKs and improve stability and performance.
Review the new KCQL properties and configurations for Azure Service Bus and Datalake connectors.
Ensure compatibility with the updated Azure Service Bus dependencies if you use custom extensions or integrations.
Thank you to all contributors! 🎉
Improves ElasticSearch sinks by allowing the message key fields, or message headers to be used as part of the document primary key. Reference _key[.key_field_path
or _header.header_name to set the document primary key
Fixes the data lake sinks error message on flush.interval
Improvements to the HTTP Sink
Queue Limiting: We've set a limit on the queue size per topic to reduce the chances of an Out-of-Memory (OOM) issue. Previously the queue was unbounded and in a scenario where the http calls are slow and the sink gets more records than it clears, it would eventually lead to OOM.
Offering Timeout: The offering to the queue now includes a timeout. If there are records to be offered, but the timeout is exceeded, a retriable exception is thrown. Depending on the connector's retry settings, the operation will be attempted again. This helps avoid situations where the sink gets stuck processing a slow or unresponsive batch.
Duplicate Record Handling: To prevent the same records from being added to the queue multiple times, we've introduced a Map[TopicPartition, Offset]
to track the last processed offset for each topic-partition. This ensures that the sink does not attempt to process the same records repeatedly.
Batch Failure Handling: The changes also address a situation where an HTTP call fails due to a specific input, but the batch is not removed from the queue. This could have led to the batch being retried indefinitely, which is now prevented.
HTTP sink ignores null records. Avoids a NPE if custom SMTs send null records to the pipeline
Improvements to the HTTP Sink reporter
Datalake sinks allow an optimisation configuration to avoid rolling the file on schema change. To be used only when the schema changes are backwards compatible
Fixes for the HTTP sink batch.
HTTP sink improvements
HTTP sink improvements
Dependency version upgrades.
Fixed timestamp
of the messages and some of the fields in the payload to correctly use UTC millis.
Changed contentType
in the message to be nullable (Optional String).
Possible exception fix
Store success/failure response codes in http for the reporter.
Fixes possible exception in Reporter code.
Fixes and overall stability improvements.
Remove overriding transport options on GCS client.
Bugfix: HTTP Kafka Reporter ClientID is not unique.
Improvements for handing HTTP sink null values & fix for NPE on GCS.
Removal of shading for Avro and Confluent jars.
Addition of HTTP reporter functionally to route HTTP success and errors to a configurable topic.
Breaking Configuration Change
Configuration for the HTTP Sink changes from Json to standard Kafka Connect properties. Please study the documentation if upgrading to 8.0.0.
Adjust defaultUploadSyncPeriod
to make connector more performant by default.
Fix for issue causing sink to fail with NullPointerException due to invalid offset.
Dependency version upgrades
Fixes a gap in the avro/parquet storage where enums where converted from Connect enums to string.
Adds support for explicit "no partition" specification to kcql, to enable topics to be written in the bucket and prefix without partitioning the data.
Syntax Example: INSERT INTO foo SELECT * FROM bar NOPARTITION
Dependency version upgrades
This release introduces a new configuration option for three Kafka Connect Sink Connectors—S3, Data Lake, and GCP Storage—allowing users to disable exactly-once semantics. By default, exactly once is enabled, but with this update, users can choose to disable it, opting instead for Kafka Connect’s native at-least-once offset management.
S3 Sink Connector: connect.s3.exactly.once.enable
Data Lake Sink Connector: connect.datalake.exactly.once.enable
GCP Storage Sink Connector: connect.gcpstorage.exactly.once.enable
Default Value: true
Indexing is enabled by default to maintain exactly-once semantics. This involves creating an .indexes directory at the root of your storage bucket, with subdirectories dedicated to tracking offsets, ensuring that records are not duplicated.
Users can now disable indexing by setting the relevant property to false
. When disabled, the connector will utilise Kafka Connect’s built-in offset management, which provides at-least-once semantics instead of exactly-once.
Dependency version upgrades
Introduced new properties to allow users to filter the files to be processed based on their extensions.
For AWS S3 Source Connector:
connect.s3.source.extension.excludes
: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
connect.s3.source.extension.includes
: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
For GCP Storage Source Connector:
connect.gcpstorage.source.extension.excludes
: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
connect.gcpstorage.source.extension.includes
: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
These properties provide more control over the files that the AWS S3 Source Connector and GCP Storage Source Connector process, improving efficiency and flexibility.
Increases the default HTTP Retry timeout from 250ms total timeout to 3 minutes as default. The default consumer group max.poll.timeout
is 5 minutes, so it’s within the boundaries to avoid a group rebalance.
Adding retry delay multiplier as a configurable parameter (with default value) to Google Cloud Storage Connector. Main changes revolve around RetryConfig class and its translation to gax HTTP client config.
Making indexes directory configurable for both source and sink.
Use the below properties to customise the indexes root directory:
connect.datalake.indexes.name
connect.gcpstorage.indexes.name
connect.s3.indexes.name
See connector documentation for more information.
Use the below properties to exclude custom directories from being considered by the source.
connect.datalake.source.partition.search.excludes
connect.gcpstorage.source.partition.search.excludes
connect.s3.source.partition.search.excludes
Azure Service Bus Sink Connector.
Exception / Either Tweaks
Bump java dependencies (assertJ and azure-core)
Add logging on flushing
Fix: Java class java.util.Date support in cloud sink maps
Support added for writing Date, Time and Timestamp data types to AVRO.
Invalid Protocol Configuration Fix
To back the topics up the KCQL statement is
When *
is used the envelope setting is ignored.
This change allows for the *
to be taken into account as a default if the given message topic is not found.
Bug fix to ensure that, if specified as part of the template, the Content-Type header is correctly populated.
Update of dependencies.
Upgrading from any version prior to 7.0.0, please see the release and upgrade notes for 7.0.0.
Automated Skip for Archived Objects:
The S3 source now seamlessly bypasses archived objects, including those stored in Glacier and Deep Archive. This enhancement improves efficiency by automatically excluding archived data from processing, avoiding the connector crashing otherwise
Enhanced Key Storage in Envelope Mode:
Changes have been implemented to the stored key when using envelope mode. These modifications lay the groundwork for future functionality, enabling seamless replay of Kafka data stored in data lakes (S3, GCS, Azure Data Lake) from any specified point in time.
We’ve rolled out enhancements to tackle a common challenge faced by users of the S3 source functionality. Previously, when an external producer abruptly terminated a file without marking the end message, data loss occurred.
To address this, we’ve introduced a new feature: a property entry for KCQL to signal the handling of unterminated messages. Meet the latest addition, read.text.last.end.line.missing. When set to true, this property ensures that in-flight data is still recognized as a message even when EOF is reached but the end line marker is missing.
Upgrading from any version prior to 7.0.0, please see the release and upgrade notes for 7.0.0.
This release brings substantial enhancements to the data-lakes sink connectors, elevating their functionality and flexibility. The focal point of these changes is the adoption of the new KCQL syntax, designed to improve usability and resolve limitations inherent in the previous syntax.
New KCQL Syntax: The data-lakes sink connectors now embrace the new KCQL syntax, offering users enhanced capabilities while addressing previous syntax constraints.
Data Lakes Sink Partition Name: This update ensures accurate preservation of partition names by avoiding the scraping of characters like \
and /
. Consequently, SMTs can provide partition names as expected, leading to reduced configuration overhead and increased conciseness.
Several keywords have been replaced with entries in the PROPERTIES section for improved clarity and consistency:
WITHPARTITIONER: Replaced by PROPERTIES ('partition.include.keys'=true/false)
. When WITHPARTITIONER KeysAndValue
is set to true
, the partition keys are included in the partition path. Otherwise, only the partition values are included.
WITH_FLUSH_SIZE: Replaced by PROPERTIES ('flush.size'=$VALUE)
.
WITH_FLUSH_COUNT: Replaced by PROPERTIES ('flush.count'=$VALUE)
.
WITH_FLUSH_INTERVAL: Replaced by PROPERTIES ('flush.interval'=$VALUE)
.
The adoption of the new KCQL syntax enhances the flexibility of the data-lakes sink connectors, empowering users to tailor configurations more precisely to their requirements. By transitioning keywords to entries in the PROPERTIES section, potential misconfigurations stemming from keyword order discrepancies are mitigated, ensuring configurations are applied as intended.
Please note that the upgrades to the data-lakes sink connectors are not backward compatible with existing configurations. Users are required to update their configurations to align with the new KCQL syntax and PROPERTIES entries. This upgrade is necessary for any instances of the sink connector (S3, Azure, GCP) set up before version 7.0.0.
To upgrade to the new version, users must follow these steps:
Stop the sink connectors.
Update the connector to version 7.0.0.
Edit the sink connectors’ KCQL setting to translate to the new syntax and PROPERTIES entries.
Restart the sink connectors.
This update specifically affects datalake sinks employing the JSON storage format. It serves as a remedy for users who have resorted to a less-than-ideal workaround: employing a Single Message Transform (SMT) to return a Plain Old Java Object (POJO) to the sink. In such cases, instead of utilizing the Connect JsonConverter to seamlessly translate the payload to JSON, reliance is placed solely on Jackson.
However, it’s crucial to note that this adjustment is not indicative of a broader direction for future expansions. This is because relying on such SMT practices does not ensure an agnostic solution for storage formats (such as Avro, Parquet, or JSON).
HTTP Sink Connector (Beta)
Azure Event Hubs Source Connector.
Update of dependencies, CVEs addressed.
Please note that the Elasticsearch 6 connector will be deprecated in the next major release.
Important: The AWS Source Partition Search properties have changed for consistency of configuration. The properties that have changed for 6.2.0 are:
connect.s3.partition.search.interval
changes to connect.s3.source.partition.search.interval
.
connect.s3.partition.search.continuous
changes to connect.s3.source.partition.search.continuous
.
connect.s3.partition.search.recurse.levels
changes to connect.s3.source.partition.search.recurse.levels
.
If you use any of these properties, when you upgrade to the new version then your source will halt and the log will display an error message prompting you to adjust these properties. Be sure to update these properties in your configuration to enable the new version to run.
Dependency upgrade of Hadoop libraries version to mitigate against CVE-2022-25168.
Jakarta Dependency Migration: Switch to Jakarta EE dependencies in line with industry standards to ensure evolution under the Eclipse Foundation.
There has been some small tidy up of dependencies, restructuring and removal of unused code, and a number of connectors have a slimmer file size without losing any functionality.
The configuration directory has been removed as these examples are not kept up-to-date. Please see the connector documentation instead.
Dependency upgrades.
In this release, all connectors have been updated to address an issue related to conflicting Antlr jars that may arise in specific environments.
Byte Array Support: Resolved an issue where storing the Key/Value as an array of bytes caused compatibility problems due to the connector returning java.nio.ByteBuffer while the Connect framework’s ByteArrayConverter only works with byte[]. This update ensures seamless conversion to byte[] if the key/value is a ByteBuffer.
Fix for NullPointerException: Addressed an issue where the JMS sink connector encountered a NullPointerException when processing a message with a null JMSReplyTo header value.
Fix for DataException: Resolved an issue where the JMS source connector encountered a DataException when processing a message with a JMSReplyTo header set to a queue.
GZIP Support for JSON Writing: Added support for GZIP compression when writing JSON data to AWS S3, GCP Storage, and Azure Datalake sinks.
Improve suppport for handling GCP naming conventions
Removed check preventing nested paths being used in the sink.
Avoid cast exception in GCP Storage connector when using Credentials mode.
Azure Datalake Sink Connector (Beta)
GCP Storage Sink Connector (Beta)
Kudu
Hazelcast
Hbase
Hive
Pulsar
Standardising package names. Connector class names and converters will need to be renamed in configuration.
Some clean up of unused dependencies.
Introducing cloud-common
module to share code between cloud connectors.
Cloud sinks (AWS S3, Azure Data Lake and GCP Storage) now support BigDecimal and handle nullable keys.
Consumer Group Offsets S3 Sink Connector
Enhancement: BigDecimal Support
Bug fix: Redis does not initialise the ErrorHandler
Test Fixes and E2E Test Clean-up: Improved testing with bug fixes and end-to-end test clean-up.
Code Optimization: Removed unused code and converted Java code and tests to Scala for enhanced stability and maintainability.
Ascii Art Loading Fix: Resolved issues related to ASCII art loading.
Build System Updates: Implemented build system updates and improvements.
Stream Reactor Integration: Integrated Kafka-connect-query-language inside of Stream Reactor for enhanced compatibility.
STOREAS Consistency: Ensured consistent handling of backticks with STOREAS.
The source and sink has been the focus of this release.
Full message backup. The S3 sink and source now supports full message backup. This is enabled by adding in the KCQL PROPERTIES('store.envelope'=true)
Removed Bytes_*** storage format. For those users leveraging them there is a migration information below. Storing raw Kafka message the storage format should be AVRO/PARQUET/JSON(less ideal).
Introduced support for BYTES storing single message as raw binary. Typically, storing images or videos are the use case for this. This is enabled by adding in the KCQL STOREAS BYTES
Introduced support for PROPERTIES
to drive new settings required to drive the connectors’ behaviour. The KCQL looks like this: INSERT INTO ... SELECT ... FROM ... PROPERTIES(property=key, ...)
Enhanced PARTITIONBY Support: expanded support for PARTITIONBY fields, now accommodating fields containing dots. For instance, you can use PARTITIONBY a, `field1.field2`
for enhanced partitioning control.
Advanced Padding Strategy: a more advanced padding strategy configuration. By default, padding is now enforced, significantly improving compatibility with S3 Source.
Improved Error Messaging: Enhancements have been made to error messaging, providing clearer guidance, especially in scenarios with misaligned topic configurations (#978).
Commit Logging Refactoring: Refactored and simplified the CommitPolicy for more efficient commit logging (#964).
Comprehensive Testing: Added additional unit testing around configuration settings, removed redundancy from property names, and enhanced KCQL properties parsing to support Map structures.
Consolidated Naming Strategies: Merged naming strategies to reduce code complexity and ensure consistency. This effort ensures that both hierarchical and custom partition modes share similar code paths, addressing issues related to padding and the inclusion of keys and values within the partition name.
Optimized S3 API Calls: Switched from using deleteObjects to deleteObject for S3 API client calls (#957), enhancing performance and efficiency.
JClouds Removal: The update removes the use of JClouds, streamlining the codebase.
Legacy Offset Seek Removal: The release eliminates legacy offset seek operations, simplifying the code and enhancing overall efficiency
Expanded Text Reader Support: new text readers to enhance data processing flexibility, including:
Regex-Driven Text Reader: Allows parsing based on regular expressions.
Multi-Line Text Reader: Handles multi-line data.
Start-End Tag Text Reader: Processes data enclosed by start and end tags, suitable for XML content.
Improved Parallelization: enhancements enable parallelization based on the number of connector tasks and available data partitions, optimizing data handling.
Data Consistency: Resolved data loss and duplication issues when the connector is restarted, ensuring reliable data transfer.
Dynamic Partition Discovery: No more need to restart the connector when new partitions are added; runtime partition discovery streamlines operations.
Efficient Storage Handling: The connector now ignores the .indexes directory, allowing data storage in an S3 bucket without a prefix.
Increased Default Records per Poll: the default limit on the number of records returned per poll was changed from 1024 to 10000, improving data retrieval efficiency and throughput.
Ordered File Processing: Added the ability to process files in date order. This feature is especially useful when S3 files lack lexicographical sorting, and S3 API optimisation cannot be leveraged. Please note that it involves reading and sorting files in memory.
Parquet INT96 Compatibility: The connector now allows Parquet INT96 to be read as a fixed array, preventing runtime failures.
The Kudu and Hive connectors are now deprecated and will be removed in a future release.
Fixed a memory issue with the InfluxDB writer.
Upgraded to Influxdb2 client (note: doesn’t yet support Influxdb2 connections).
For installations that have been using the preview version of the S3 connector and are upgrading to the release, there are a few important considerations:
Previously, default padding was enabled for both “offset” and “partition” values starting in June.
However, in version 5.0, the decision to apply default padding to the “offset” value only, leaving the " partition" value without padding. This change was made to enhance compatibility with querying in Athena.
If you have been using a build from the master branch since June, your connectors might have been configured with a different default padding setting.
To maintain consistency and ensure your existing connector configuration remains valid, you will need to use KCQL configuration properties to customize the padding fields accordingly.
Starting with version 5.0.0, the following configuration keys have been replaced.
In version 4.1, padding options were available but were not enabled by default. At that time, the default padding length, if not specified, was set to 8 characters.
However, starting from version 5.0, padding is now enabled by default, and the default padding length has been increased to 12 characters.
Enabling padding has a notable advantage: it ensures that the files written are fully compatible with the Lenses Stream Reactor S3 Source, enhancing interoperability and data integration.
Sinks created with 4.2.0 and 4.2.1 should retain the padding behaviour, and, therefore should disable padding:
If padding was enabled in 4.1, then the padding length should be specified in the KCQL statement:
STOREAS Bytes_***
is usedThe Bytes_*** storage format has been removed. If you are using this storage format, you will need to install the 5.0.0-deprecated connector and upgrade the connector instances by changing the class name:
Source Before:
Source After:
Sink Before:
Sink After:
To migrate to the new configuration, please follow the following steps:
stop all running instances of the S3 connector
upgrade the connector to 5.0.0
update the configuration to use the new properties
resume the stopped connectors
All
Ensure connector version is retained by connectors
Lenses branding ASCII art updates
AWS S3 Sink Connector
Improves the error in case the input on BytesFormatWriter is not binary
Support for ByteBuffer which may be presented by Connect as bytes
Note
From Version 4.10, AWS S3 Connectors will use the AWS Client by default. You can revert to the jClouds version by setting the connect.s3.aws.client
property.
All
Scala upgrade to 2.13.10
Dependency upgrades
Upgrade to Kafka 3.3.0
SimpleJsonConverter - Fixes mismatching schema error.
AWS S3 Sink Connector
Add connection pool config
Add Short type support
Support null values
Enabling Compression Codecs for Avro and Parquet
Switch to AWS client by default
Add option to add a padding when writing files, so that files can be restored in order by the source
Enable wildcard syntax to support multiple topics without additional configuration.
AWS S3 Source Connector
Add connection pool config
Retain partitions from filename or regex
Switch to AWS client by default
MQTT Source Connector
Allow toggling the skipping of MQTT Duplicates
MQTT Sink Connector
Functionality to ensure unique MQTT Client ID is used for MQTT sink
Elastic6 & Elastic7 Sink Connectors
Fixing issue with missing null values
All
Scala 2.13 Upgrade
Gradle to SBT Migration
Producing multiple artifacts supporting both Kafka 2.8 and Kafka 3.1.
Upgrade to newer dependencies to reduce CVE count
Switch e2e tests from Java to Scala.
AWS S3 Sink Connector
Optimal seek algorithm
Parquet data size flushing fixes.
Adding date partitioning capability
Adding switch to use official AWS library
Add AWS STS dependency to ensure correct operation when assuming roles with a web identity token.
Provide better debugging in case of exceptions.
FTP Source Connector
Fixes to slice mode support.
Hazelcast Sink Connector
Upgrade to HazelCast 4.2.4. The configuration model has changed and now uses clusters instead of username and password configuration.
Hive Sink Connector
Update of parquet functionality to ensure operation with Parquet 1.12.2.
Support for Hive 3.1.3.
JMS Connector
Enable protobuf support.
Pulsar
Upgrade to Pulsar 2.10 and associated refactor to support new client API.
All
Replace Log4j with Logback to overcome CVE-2021-44228
Bringing code from legacy dependencies inside of project
Cassandra Sink Connector
Ensuring the table name is logged on encountering an InvalidQueryException
HBase Sink Connector
Alleviate possible race condition
All
Move to KCQL 2.8.9
Change sys.errors to ConnectExceptions
Additional testing with TestContainers
Licence scan report and status
AWS S3 Sink Connector
S3 Source Offset Fix
Fix JSON & Text newline detection when running in certain Docker images
Byte handling fixes
Partitioning of nested data
Error handling and retry logic
Handle preCommit with null currentOffsets
Remove bucket validation on startup
Enabled simpler management of default flush values.
Local write mode - build locally, then ship
Deprecating old properties, however rewriting them to the new properties to ensure backwards compatibility.
Adding the capability to specify properties in yaml configuration
Rework exception handling. Refactoring errors to use Either[X,Y] return types where possible instead of throwing exceptions.
Ensuring task can be stopped gracefully if it has not been started yet
ContextReader testing and refactor
Adding a simple state model to the S3Writer to ensure that states and transitions are kept consistent. This can be improved in time.
AWS S3 Source Connector
Change order of match to avoid scala.MatchError
S3 Source rewritten to be more efficient and use the natural ordering of S3 keys
Region is necessary when using the AWS client
Cassandra Sink & Source Connectors
Add connection and read client timeout
FTP Connector
Support for Secure File Transfer Protocol
Hive Sink Connector
Array Support
Kerberos debug flag added
Influx DB Sink
Bump influxdb-java from version 2.9 to 2.29
Added array handling support
MongoDB Sink Connector
Nested Fields Support
Redis Sink Connector
Fix Redis Pubsub Writer
Add support for json and json with schema
Move to connect-common 2.0.5 that adds complex type support to KCQL
AWS S3 Sink Connector
Prevent null pointer exception in converters when maps are presented will null values
Offset reader optimisation to reduce S3 load
Ensuring that commit only occurs after the preconfigured time interval when using WITH_FLUSH_INTERVAL
AWS S3 Source Connector (New Connector)
Cassandra Source Connector
Add Bucket Timeseries Mode
Reduction of logging noise
Proper handling of uninitialized connections on task stop()
Elasticsearch Sink Connector
Update default port
Hive Sink
Improve Orc format handling
Fixing issues with partitioning by non-string keys
Hive Source
Ensuring newly written files can be read by the hive connector by introduction of a refresh frequency configuration option.
Redis Sink
Correct Redis writer initialisation
AWS S3 Sink Connector
Elasticsearch 7 Support
Hive Source
Rename option connect.hive.hive.metastore
to connect.hive.metastore
Rename option connect.hive.hive.metastore.uris
to connect.hive.metastore.uris
Fix Elastic start up NPE
Fix to correct batch size extraction from KCQL on Pulsar
Move to Scala 2.12
Move to Kafka 2.4.1 and Confluent 5.4
Deprecated:
Druid Sink (not scala 2.12 compatible)
Elastic Sink (not scala 2.12 compatible)
Elastic5 Sink(not scala 2.12 compatible)
Redis
Add support for Redis Streams
Cassandra
Add support for setting the LoadBalancer policy on the Cassandra Sink
ReThinkDB
Use SSL connection on Rethink initialize tables is ssl set
FTP Source
Respect connect.ftp.max.poll.records when reading slices
MQTT Source
Allow lookup of avro schema files with wildcard subscriptions
This page describes how to perform a backup and restore of data in your Kafka cluster.
The following external storage for backing up to and restoring from are:
AWS S3
Backup and restore are achieved using the standard connectors but enabling the Message Envelope to ensure the correct metadata is persisted.
A Kafka message includes keys, values, headers, and metadata (topic, partition, offset, timestamp).
The connector wraps these messages in an "envelope", streamlining backup and restoration without relying on complex Kafka Connect transformations.
Here's how the envelope is structured:
In this format, all parts of the Kafka message are retained. This is beneficial for backup, restoration, and to run analytical queries.
The Source connector uses this format to rebuild the original Kafka message and send it to the specified topic.
Anything can be stored in S3, and the connector does its best to support the major formats, offering support for:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
This format is decoupled from the format in Kafka. The translation from Kafka to Connect happens via the key.converter
and value.converter
connector properties.
Partitioning is crucial for organizing data and improving query speeds. In S3, this is achieved using the Object Key. By default, the connector reflects the structure of the Kafka topic it is sending to. For instance, a three-partition topic would use this configuration:
This would result in:
The connector allows for customised partitioning, which has its perks:
Better performance in subsequent data queries due to organized partitions.
Easy data management through time intervals, like year or month.
Keeping sensitive data in distinct partitions for tighter access controls.
To adjust partitioning, use the PARTITIONBY clause in the KCQL configuration. This can use the Kafka message's key, value, or headers for partitioning.
For instance, for a "sales" Kafka topic with transaction messages, the KCQL can partition data by transaction year, product type, and customer region.
The Kafka Connect S3 Sink Connector will create custom object keys in your S3 bucket that incorporate the customer ID, transaction year, product category, and customer region, resulting in a coarser partitioning strategy. For instance, an object key might look like this:
To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:
This will result in object keys like:
Organizing data into time-based intervals within custom object keys can be highly beneficial. To achieve time-based intervals with a custom object key naming, the connector supports a complementary Kafka Connect Single Message Transformer (SMT) plugin designed to streamline this process. You can find the transformer plugin and documentation here.
Consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called `timestamp`. Here's the connector configuration to achieve this:
In this configuration:
The TimestampConverter SMT is used to convert the timestamp field in the Kafka message's value into a string value based on the specified format pattern (yyyy-MM-dd-HH). This allows us to format the timestamp to represent an hourly interval.
The InsertWallclock SMT incorporates the current wallclock time in the specified format (yyyy-MM-dd-HH).
The PARTITIONBY clause leverages the timestamp field and the wallclock header to craft the object key, providing precise control over data partitioning.
This page describes configuring sink converters for Kafka Connect.
You can configure the converters either at the Connect worker level or at the Connector instance level.
If you follow the best practice while producing the events, each message should carry its schema information. The best option is to send AVRO.
This requires the SchemaRegistry.
Sometimes the producer would find it easier to just send a message with Schema.String
and a JSON string. In this case, your connector configuration should be set to value.converter=org.apache.kafka.connect.json.JsonConverter
. This doesn’t require the SchemaRegistry.
Many existing systems publish JSON over Kafka and bringing them in line with best practices is quite a challenge, hence we added the support. To enable this support you must change the converters in the connector configuration.
The page provides examples of HTTP Sink templating.
In this case the converters are irrelevant as we are not using the message content to populate our message template.
The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.
Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.
The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.
Fields from the AVRO message are substituted into the message body in the following example:
This page describes how to use converters with source systems sending JSON and Avro.
Source converters depend on the source system you are reading data from. The Connect SourceTask class requires you to supply a List of SourceRecords. Those records can have a schema but how the schema is translated from the source system to a Connect Struct depends on the connector.
We provide four converters out of the box but you can plug in your own. The WITHCONVERTER
keyword supports this option. These can be used when source systems send records as JSON or AVRO, for example, MQTT or JMS.
Not all Connectors support the source converters. Check the option reference for your connector.
Before records are passed back to connect, they go through the converter if specified.
The payload is an Avro message. In this case, you need to provide a path for the Avro schema file to be able to decode it.
The incoming payload is JSON, the resulting Kafka message value will be of type string and the contents will be the incoming JSON.
The payload is a JSON message. This converter will parse the JSON and create an Avro record for it which will be sent to Kafka.
An experimental converter for translating JSON messages to Avro. The Avro schema is fully compatible as new fields are added as the JSON payload evolves.
This page describes the enterprise support available for the Stream Reactor Kafka Connect connectors.
Lenses offers support for our connectors as a paid subscription, The list of connectors is always growing and . Please reach out to for for any inquiries.
varies depending on connectors. The most recent connectors are considered "Next-Gen" and are available with a premium plus SLA to accommodate the most demanding requirements.
This page describes how to use Error policies in Stream Reactor sink connectors.
In addition to the dead letter queues provided by Kafka Connect, the Stream Reactor sink connectors support Error Policies to handle failure scenarios.
The sinks have three error policies that determine how failed writes to the target database are handled. These error policies allow you to control the behaviour of the sink if it encounters an error when writing records to the target system. Since Kafka retains the records, subject to the configured retention policy of the topic, the sink can ignore the error, fail the connector or attempt redelivery.
Any error in writing to the target system will be propagated up and processing is stopped. This is the default behaviour.
Any error in writing to the target database is ignored and processing continues.
Keep in mind This can lead to missed errors if you don’t have adequate monitoring. Data is not lost as it’s still in Kafka subject to Kafka’s retention policy. The sink currently does not distinguish between integrity constraint violations and or other exceptions thrown by any drivers or target systems.
Any error in writing to the target system causes the RetryIterable exception to be thrown. This causes the Kafka Connect framework to pause and replay the message. Offsets are not committed. For example, if the table is offline it will cause a write failure, and the message can be replayed. With the Retry policy, the issue can be fixed without stopping the sink.
Examples for GCP Sink Kafka Connector time based partitioning.
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. This provides a default partitioning mechanism for Kafka topics.
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
This section describes how to contribute a new connector to the Stream Reactor.
SBT, Java 11, and Scala 2.13 are required.
The Stream Reactor is built using SBT. Each connector is defined in a submodule in the root project.
Add the new directory called kafka-connect-[your-connector].
Under this add the standard path /src/main/
Use io.lenses.streamreactor.connector as the parent package. The following convention is used but each connector is different and can have more sub packages:
config - configuration and settings
sink - sink connectors, tasks and writers
source - source connectors, task and readers
Dependencies are declared in project/Dependencies.scala. Add the dependencies for you connector as a new field in the version object and the maven coordinates, for example:
Next, in the Dependencies trait add a sequence to hold you dependencies:
Next, declare the submodule in Build.sbt.
Add the project to the subproject list:
Defined the dependencies for you module. In this example kafkaConnectAzureServiceBusDeps holds the dependencies defined earlier.
GitHub Release downloads for Stream Reactor Connectors, Secret Providers and SMTs.
This page describes managing a basic connector instance in your Connect cluster.
To deploy a connector into the Kafka Connect Cluster, you must follow the steps below:
You need to have the Jars in your Kafka Connect Cluster plugin.path
Each connector has mandatory configurations that must be deployed and validated; other configurations are optional. Always read the connector documentation first.
You can deploy the connector using Kafka Connect API or Lenses to manage it for you.
Sample of Connector, AWS S3 Sink Connector from Stream Rector:
Let's drill down this connector configuration and what will happen when I deploy:
connector.class
is the plugin we will use.
task.max
is how many tasks will be executed in the Kafka Connect Cluster. In this example, we will have 1 task; my topic has 9 partitions, so in this case, in the consumer group of this connector, we will have 1 instance. This one task will consume 9 partitions. To scale is just to increase the number of tasks.
name
of the connector on the Kafka Connect Cluster must be a unique name for each Kafka Connect Cluster.
topics
the topic name will be consumed, and all data will be written in AWS S3, as our example describes.
value.converter
is the format type used to deserialize or serialize the data in value. In this case, our value will be in json
format.
key.converter
is the format type used to deserialize or serialize the data in key. In this case, our key will be in string
format.
key.converter.schemas.enable
is a field where we tell Kafka Connect if we want to include the value schema in the message. in our example, false
we don't want to include the value schema.
value.converter.schemas.enable
is a field where we tell Kafka Connect if we want to include the key schema in the message. in our example, false
we don't want to include the key schema.
connect.s3.aws.auth.mode
is what is the type of authentication we will use to connect to the AWS S3 bucket.
connect.s3.aws.access.ke
y
is the access key to authenticate into the AWS S3 bucket.
connect.s3.aws.secret.key
is the secret key to authenticate into the AWS S3 bucket.
connect.s3.aws.region
which region the AWS S3 bucket is deployed.
connect.s3.kcql
We use the Kafka Connect Query for configuration, bucket name, folder, which format will be stored, and frequency of adding new files into the bucket.
After deploying your Kafka Connector into your Kafka Connect Cluster, it will be managed by the Kafka Connect Cluster.
To better show how Kafka Connect manages your connectors, we will use Lenses UI.
The image below is the Lenses Connectors list:
In the following image, we will delve into the Kafka Connector details.
Consumer Group
, when we use Lenses, we can see which consumer group is reading and consuming that topic.
Connector tasks
, we can see which tasks are open, the status, and how many records are in and out of that topic.
Removed support for JSON configuration format. Instead please now configure your HTTP Sink connector using kafka connect properties. .
Introduced SSL Configuration. .
Introduced OAuth Support. .
Support for dynamic index names in KCQL.
Configurable tombstone behaviour using KCQL property. behavior.on.null.values
SSL Support using standard Kafka Connect properties.
Full Changelog:
Azure Service Bus Source Connector
GCP Pub/Sub Source Connector
Full Changelog:
GCP Storage Source Connector (Beta)
The deprecated connector won’t be developed any further and will be removed in a future release. If you want to talk to us about a migration plan, please get in touch with us at .
please refer to
For deep configuration of AWS S3 Sink connect, click
4.1.0+
Supported
2.8-3.5 (Confluent 6.2 -> 7.5)
4.0.0 (kafka 3.1 Build)
Supported
3.1 (Confluent 7.1)
4.0.0 (kafka 2.8 Build)
Unsupported
2.8 (Confluent 6.2)
2.0.0+
Deprecated
2.5 (Confluent 5.5)
1.2.7
Deprecated
2.0-2.4 (Confluent 5.4)
AWS Secret Key
aws.secret.key
connect.s3.aws.secret.key
Access Key
aws.access.key
connect.s3.aws.access.key
Auth Mode
aws.auth.mode
connect.s3.aws.auth.mode
Custom Endpoint
aws.custom.endpoint
connect.s3.custom.endpoint
VHost Bucket
aws.vhost.bucket
connect.s3.vhost.bucket
[connector-prefix].error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options, NOOP, the error is swallowed, THROW, the error is allowed to propagate and retry. For RETRY the Kafka message is redelivered up to a maximum number of times specified by the [connector-prefix].max.retries option
THROW
[connector-prefix].max.retries
The maximum number of times a message is retried. Only valid when the [connector-prefix].error.policy is set to RETRY
10
[connector-prefix].retry.interval
The interval, in milliseconds between retries, if the sink is using [connector-prefix].error.policy set to RETRY
60000
AWS S3 Source and Sink
X
X
X
Azure Data Lake Sink
X
X
X
Google Cloud Storage Source and Sink
X
X
X
HTTP Sink
X
X
X
Azure Event Hub source and sink
X
X
X
Azure service Bus Source and Sink
X
X
X
MQTT
X
X
JMS Sink and Source
X
X
Elastic 6 and 7 Sink
X
X
X
FTP Source
X
X
Cassandra source
X
X
Azure DocumentDB Sink
X
X
MongoDB Sink
X
X
Redis Sink
X
X
This page describes the usage of the Stream Reactor Azure Data Lake Gen2 Source Connector.
Coming soon!
This page describes the usage of the Stream Reactor Google PubSub Source Connector.
The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).
For more examples see the .
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The source and target of the data are specified via the INSERT INTO... SELECT * FROM
clause. The connector will write all the records to the given topic, from the given subscription:
The PROPERTIES
clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').
Properties can be defined in any order.
The following properties are supported:
The connector offers three distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.
Here's an example configuration for the "Credentials" mode:
And here is an example configuration using the "File" mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.
Two modes are available: Default Mode and Compatibility Mode.
Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.
You can choose whichever suits your requirements.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: A String of the Pub/Sub MessageID.
Kafka Value: The Pub/Sub message value as BYTES.
Kafka Headers: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.
The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.
The Kafka Value is mapped from the body of the Pub/Sub message.
The Kafka Headers include:
PublishTimestamp: Long value representing the time when the Pub/Sub message was published, in seconds.
GCPProjectID: The GCP Project
PubSubTopicID: The Pub/Sub Topic ID.
PubSubSubscriptionID: The Pub/Sub Subscription ID.
All Pub/Sub message attributes: Each attribute from the Pub/Sub message is mapped as a separate header.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.
Kafka Value: Contains the message data and attributes from the Pub/Sub message.
The Key is a structure with these fields:
The Value is a structure with these fields:
When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
batch.size
The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping).
int
1000
cache.ttl
The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message.
long
1 hour
queue.max
Data is loaded into a queue asynchronously so that it stands ready when the poll
call is activated. Control the maximum number of records to hold in the queue per KCQL mapping.
int
10000
ProjectId
String
The Pub/Sub project containing the topic from which messages are polled.
TopicId
String
The Pub/Sub topic containing the messages.
SubscriptionId
String
The Pub/Sub subscription of the Pub/Sub topic.
MessageId
String
A unique ID for a Pub/Sub message
MessageData
Optional String
The body of the Pub/Sub message.
AttributeMap
Optional String
The attribute map associated with the Pub/Sub message.
connect.pubsub.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
Credentials, File or Default
Default
connect.pubsub.gcp.credentials
For “auth.mode” credentials: GCP Authentication credentials string.
string
(Empty)
connect.pubsub.gcp.file
For “auth.mode” file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.pubsub.gcp.project.id
GCP Project ID.
string
(Empty)
connect.pubsub.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
connect.pubsub.output.mode
Output mode. Please see Output Modes documentation below.
Default or Compatibility
Default
This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.
Coming soon!
This page describes the usage of the Stream Reactor JMS Source Connector.
A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.
The connector uses the standard JMS protocols and has been tested against ActiveMQ.
The connector allows for the JMS initial.context.factory
and connection.factory
to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path
of the connector.
Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.
The schema of the messages is fixed and can be found under Data Types unless a converter is used.
You must provide the JMS implementation jars for your JMS service.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.
The connector supports converters to handle different message payload formats in the source topic or queue.
If no converter is provided the JMS message is converter to a Kafka Struct representation.
message_timestamp
Optional int64
correlation_id
Optional string
redelivered
Optional boolean
reply_to
Optional string
destination
Optional string
message_id
Optional string
mode
Optional int32
type
Optional string
priority
Optional int32
bytes_payload
Optional bytes
properties
Map of string
connect.jms.url
Provides the JMS broker url
string
connect.jms.initial.context.factory
Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
string
connect.jms.connection.factory
Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory
string
ConnectionFactory
connect.jms.kcql
connect.jms.kcql
string
connect.jms.subscription.name
subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics
string
connect.jms.password
Provides the password for the JMS connection
password
connect.jms.username
Provides the user for the JMS connection
string
connect.jms.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.jms.retry.interval
The time in milliseconds between retries.
int
60000
connect.jms.max.retries
The maximum number of times to try the write again.
int
20
connect.jms.destination.selector
Selector to use for destination lookup. Either CDI or JNDI.
string
CDI
connect.jms.initial.context.extra.params
List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp
list
[]
connect.jms.batch.size
The number of records to poll for on the target JMS destination in each Connect poll.
int
100
connect.jms.polling.timeout
Provides the timeout to poll incoming messages
long
1000
connect.jms.source.default.converter
Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter
string
connect.jms.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.headers
Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.jms.evict.interval.minutes
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.
int
10
connect.jms.evict.threshold.minutes
The number of minutes after which an uncommitted entry becomes evictable from the connector cache.
int
10
connect.jms.scale.type
How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max
This page describes the usage of the Stream Reactor AWS S3 Sink Connector.
This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to AWS S3 Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.
For more examples see the tutorials.
This example writes to a bucket called demo, partitioning by a field called ts
, store as JSON.
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The connector uses KCQL to map topics to S3 buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .
:
In this case, you can use the following KCQL statement:
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics
or topics.regex
property in the connector and ensure proper mapping to the KCQL statements.
Set the FROM clause to *. This will auto map the topic as a partition.
The object key serves as the filename used to store data in S3. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $bucket/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY
clause. The format is either $bucket/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension
(AWS Athena naming style mimicking Hive-like data partitioning) or $bucket/[$prefix]/customValue/topic(partition_offset).ext
. The extension is determined by the selected storage format.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of AWS S3 key length restrictions.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.
The partition clause works on header, key and values fields of the Kafka message.
To extract fields from the message values, simply use the field names in the PARTITIONBY
clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the S3 object key name:
Kafka message headers can also be used in the S3 object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure S3 object keys effectively.
To enable Athena-like partitioning, use the following syntax:
Storing data in Amazon S3 and partitioning it by time is a common practice in data management. For instance, you may want to organize your S3 data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY
clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp
. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY
clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
While the STOREAS
clause is optional, it plays a pivotal role in determining the storage format within AWS S3. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter
and value.converter
settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in S3. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope
property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in S3 is desired.
Storing the message Value Avro data as Parquet in S3:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in S3.
Enabling the full message stored as JSON in S3:
Enabling the full message stored as AVRO in S3:
If the restore (see the S3 Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the flush.count, flush.size, and flush.interval properties. The settings are optional and if not specified the defaults are:
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3_600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to S3). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
padding.length.partition
Sets the padding length for the partition.
Int
0
padding.length.offset
Sets the padding length for the offset.
Int
12
partition.include.keys
Specifies whether partition keys are included.
Boolean
false Default (Custom Partitioning): true
store.envelope
Indicates whether to store the entire Kafka message
Boolean
store.envelope.fields.key
Indicates whether to store the envelope’s key.
Boolean
store.envelope.fields.headers
Indicates whether to store the envelope’s headers.
Boolean
store.envelope.fields.value
Indicates whether to store the envelope’s value.
Boolean
store.envelope.fields.metadata
Indicates whether to store the envelope’s metadata.
Boolean
The sink connector optimizes performance by padding the output objects. This proves beneficial when using the S3 Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the S3 Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.
Here are the available options for the connect.gcpstorage.compression.codec
, along with indications of their support by Avro, Parquet and JSON writers:
UNCOMPRESSED
✅
✅
✅
SNAPPY
✅
✅
GZIP
✅
✅
LZ0
✅
LZ4
✅
BROTLI
✅
BZIP2
✅
ZSTD
✅
⚙️
✅
DEFLATE
✅
⚙️
XZ
✅
⚙️
Please note that not all compression libraries are bundled with the S3 connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
Here’s an example configuration for the Credentials mode:
For enhanced security and flexibility when using the Credentials mode, it is highly advisable to utilize Connect Secret Providers.
The connector supports Error policies.
The connector can also be used against API compatible systems provided they implement the following:
The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.
By default, the index objects are grouped within a prefix named .indexes
for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this .indexes
prefix.
You can configure the prefix for these index objects using the property connect.s3.indexes.name. This property specifies the path from the root of the S3 bucket. Note that even if you configure this property, the connector will still place the indexes within a nested prefix of the specified prefix.
Index Name (connect.s3.indexes.name
)
Resulting Indexes Prefix Structure
Description
.indexes
(default)
.indexes/<connector_name>/
The default setup, where each connector uses its own subdirectory within .indexes
.
custom-indexes
custom-indexes/<connector_name>/
Custom root directory custom-indexes
, with a subdirectory for each connector.
indexes/s3-connector-logs
indexes/s3-connector-logs/<connector_name>/
Uses a custom subdirectory s3-connector-logs
within indexes
, with a subdirectory for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes
, with a subdirectory for each connector.
connect.s3.aws.auth.mode
Specifies the AWS authentication mode for connecting to S3.
string
“Credentials,” “Default”
“Default”
connect.s3.aws.access.key
The AWS Access Key used for authentication.
string
(Empty)
connect.s3.aws.secret.key
The AWS Secret Key used for authentication.
string
(Empty)
connect.s3.aws.region
The AWS Region where the S3 bucket is located.
string
(Empty)
connect.s3.pool.max.connections
Specifies the maximum number of connections allowed in the AWS Client’s HTTP connection pool when interacting with S3.
int
-1 (undefined)
50
connect.s3.custom.endpoint
Allows for the specification of a custom S3 endpoint URL if needed.
string
(Empty)
connect.s3.vhost.bucket
Enables the use of Vhost Buckets for S3 connections. Always set to true when custom endpoints are used.
boolean
true, false
false
connect.s3.error.policy
Defines the error handling policy when errors occur during data transfer to or from S3.
string
“NOOP,” “THROW,” “RETRY”
“THROW”
connect.s3.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.s3.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.s3.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with S3.
long
5
connect.s3.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.s3.local.tmp.directory
Enables the use of a local folder as a staging area for data transfer operations.
string
(Empty)
connect.s3.kcql
A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.
string
(Empty)
connect.s3.compression.codec
Sets the Parquet compression codec to be used when writing data to S3.
string
“UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”
“UNCOMPRESSED”
connect.s3.compression.level
Sets the compression level when compression is enabled for data transfer to S3.
int
1-9
(Empty)
connect.s3.seek.max.files
Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.
int
5
connect.s3.indexes.name
Configure the indexes prefix for this connector.
string
".indexes"
connect.s3.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.s3.schema.change.rollover
When set to false
, the file will not roll over upon receiving a record with a schema different from the accumulated ones. This is a performance optimization, intended only for cases where schema changes are backward-compatible.
boolean
true, false
true
SMT that inserts date, year, month, day, hour, minute and second headers using the record timestamp. If the record timestamp is null, the SMT uses the current system time.
The headers inserted are of type STRING. By using this SMT, you can partition the data by yyyy-MM-dd/HH
or yyyy/MM/dd/HH
, for example, and only use one SMT.
The list of headers inserted are:
date
year
month
day
hour
minute
second
All headers can be prefixed with a custom prefix. For example, if the prefix is wallclock_
, then the headers will be:
wallclock_date
wallclock_year
wallclock_month
wallclock_day
wallclock_hour
wallclock_minute
wallclock_second
When used with the Lenses connectors for S3, GCS or Azure data lake, the headers can be used to partition the data. Considering the headers have been prefixed by _
, here are a few KCQL examples:
To store the epoch value, use the following configuration:
To prefix the headers with wallclock_
, use the following:
To change the date format, use the following:
To use the timezone Asia/Kolkoata
, use the following:
To facilitate S3, GCS, or Azure Data Lake partitioning using a Hive-like partition name format, such as date=yyyy-MM-dd / hour=HH
, employ the following SMT configuration for a partition strategy.
and in the KCQL setting utilise the headers as partitioning keys:
This page describes the usage of the Stream Reactor Redis Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:
You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:
This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:
Composite keys are supported with the PK clause, a delimiter can be set with the optional configuration property connect.redis.pk.delimiter.
To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:
This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.
The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.
The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store in the sorted-sets. Set KCQL clause to define the filed using PK (primary key)
Notice we have dropped the INSERT clause.
The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:
This will create a key with names FX-USDGBP , FX-EURGBP etc.
The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.
To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:
To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:
To insert a message from a Kafka topic to a Redis PubSub use the following KCQL syntax:
The channel to write to in Redis is determined by a field in the payload of the Kafka message set in the KCQL statement, in this case, a field called myfield
.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
The connector supports .
header.prefix.name
Optional header prefix.
String
Low
date.format
Optional Java date time formatter.
String
yyyy-MM-dd
Low
year.format
Optional Java date time formatter for the year component.
String
yyyy
Low
month.format
Optional Java date time formatter for the month component.
String
MM
Low
day.format
Optional Java date time formatter for the day component.
String
dd
Low
hour.format
Optional Java date time formatter for the hour component.
String
HH
Low
minute.format
Optional Java date time formatter for the minute component.
String
mm
Low
second.format
Optional Java date time formatter for the second component.
String
ss
Low
timezone
Optional. Sets the timezone. It can be any valid Java timezone.
String
UTC
Low
locale
Optional. Sets the locale. It can be any valid Java locale.
String
en
Low
connect.redis.pk.delimiter
Specifies the redis primary key delimiter
string
.
ssl.provider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
string
ssl.protocol
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
string
TLS
ssl.truststore.location
The location of the trust store file.
string
ssl.keystore.password
The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
password
ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
string
ssl.truststore.password
The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
password
ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
string
SunX509
ssl.trustmanager.algorithm
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
string
PKIX
ssl.keystore.type
The file format of the key store file. This is optional for client.
string
JKS
ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
list
ssl.endpoint.identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate.
string
https
ssl.truststore.type
The file format of the trust store file.
string
JKS
ssl.enabled.protocols
The list of protocols enabled for SSL connections.
list
[TLSv1.2, TLSv1.1, TLSv1]
ssl.key.password
The password of the private key in the key store file. This is optional for client.
password
ssl.secure.random.implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations.
string
connect.redis.kcql
KCQL expression describing field selection and routes.
string
connect.redis.host
Specifies the redis server
string
connect.redis.port
Specifies the redis connection port
int
connect.redis.password
Provides the password for the redis connection.
password
connect.redis.ssl.enabled
Enables ssl for the redis connection
boolean
false
connect.redis.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.redis.retry.interval
The time in milliseconds between retries.
int
60000
connect.redis.max.retries
The maximum number of times to try the write again.
int
20
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
Examples for AWS S3 Source Kafka Connector.
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: aws-s3SourceConnectorParquet (This can be customized as needed)
Connector Class: io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME
: Name of the AWS S3 storage bucket.
$PREFIX_NAME
: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
This configuration example is particularly useful when you need to restore data from a AWS S3, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
Data Restoration with Envelope Structure: If you’re restoring data from AWS S3 into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000
) and a limit on the number of records (LIMIT 10000
). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
Error Handling: Error handling is configured to throw exceptions (connect.s3.error.policy=THROW
) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.s3.source.partition.extractor.type=hierarchical
), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true
), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
Similar to the above, this is another configuration for envelope format
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4
), this variant is configured for single-task processing (tasks.max=1
). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'
), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter
). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081
), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.s3.source.partition.extractor.type=hierarchical
), enabling extraction of partitioning structures from the source data location.
Examples for AWS S3 Sink Kafka Connector time based partitioning.
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. This provides a default partitioning mechanism for Kafka topics.
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
This page contains the release notes for Single Message Transforms.
Adds support for multiple "from" patterns.
This converts the format.from.pattern
field in the following SMTs:
InsertFieldTimestampHeaders
InsertRollingFieldTimestampHeaders
TimestampConverter
into a List (comma separated) so that these SMTs can support multiple (fallback) DateTimeFormatter patterns should multiple timestamps be in use.
When updating your configuration, if format.from.pattern
contains commas, enclose the pattern in double quotes.
Configurations should be backwards-compatible with previous versions of the SMT, the exception is if commas are used in the format.from.pattern
string.
To update the configuration of format.from.pattern
ensure you enclose any pattern which contains commas in double quotes.
Multiple format.from.pattern
can now be configured, each pattern containing a comma can be enclosed in double quotes:
When configuring format.from.pattern
, the order matters; less granular formats should follow more specific ones to avoid data loss. For example, place yyyy-MM-dd
after yyyy-MM-dd'T'HH:mm:ss
to ensure detailed timestamp information isn't truncated.
Increase error information for debugging.
Adds support for adding metadata to kafka connect headers (for a source connector).
Workaround for Connect runtime failing with unexplained exception where it looks like the static fields of parent class is not resolved prop.
Adds support for inserting time based headers using a Kafka message payload field.
Fix public visibility of rolling timestamp headers.
Don't make CTOR protected.
Introducing four new Single Message Transforms (SMTs) aimed at simplifying and streamlining the management of system or record timestamps, along with support for rolling windows. These SMTs are designed to significantly reduce the complexity associated with partitioning data in S3/Azure/GCS Sink based on time, offering a more efficient and intuitive approach to data organization. By leveraging these SMTs, users can seamlessly handle timestamp-based partitioning tasks, including optional rolling window functionality, paving the way for smoother data management workflows.
MQTT
Sink data from Kafka to MQTT.
AES256
Secure Kafka Connect secrets with AES256 encryption.