Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page details the configuration options for the Stream Reactor Kafka Connect source connectors.
Source connectors read data from external systems and write to Kafka.
This page describes the Stream Reactor connector plugins.
This page describes the usage of the Stream Reactor Azure Data Lake Gen2 Source Connector.
Coming soon!
This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.
This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.
For more examples see the tutorials.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.
You can specify multiple KCQL statements separated by ;
to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Service Bus of name <your-service-bus>
to Kafka topic of name <your-kafka-topic>
using the PROPERTIES specified.
The selection of fields from the Service Bus message is not supported.
Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the Azure Website.
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type
and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE
here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type
which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC
here and we need to pass it.
Parameter subscription.name
which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.
This is sufficient to enable you to create the mapping with your topic.
Please find below all the necessary KCQL properties:
Please find below all the relevant configuration parameters:
This page describes the usage of the Stream Reactor Google PubSub Source Connector.
The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).
For more examples see the .
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The source and target of the data are specified via the INSERT INTO... SELECT * FROM
clause. The connector will write all the records to the given topic, from the given subscription:
The PROPERTIES
clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').
Properties can be defined in any order.
The following properties are supported:
The connector offers three distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.
Here's an example configuration for the "Credentials" mode:
And here is an example configuration using the "File" mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.
Two modes are available: Default Mode and Compatibility Mode.
Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.
You can choose whichever suits your requirements.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: A String of the Pub/Sub MessageID.
Kafka Value: The Pub/Sub message value as BYTES.
Kafka Headers: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.
The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.
The Kafka Value is mapped from the body of the Pub/Sub message.
The Kafka Headers include:
PublishTimestamp: Long value representing the time when the Pub/Sub message was published, in seconds.
GCPProjectID: The GCP Project
PubSubTopicID: The Pub/Sub Topic ID.
PubSubSubscriptionID: The Pub/Sub Subscription ID.
All Pub/Sub message attributes: Each attribute from the Pub/Sub message is mapped as a separate header.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.
Kafka Value: Contains the message data and attributes from the Pub/Sub message.
The Key is a structure with these fields:
The Value is a structure with these fields:
This page describes the usage of the Stream Reactor Cassandra Source Connector.
Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
Examples:
The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
Keying is only supported in conjunction with the WITHFORMAT JSON clause
This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:
TIMESTAMP
TIMEUUID
TOKEN
DSESEARCHTIMESTAMP
If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.
You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.
DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.
The connector constantly loads the entire table.
The connector can be configured to:
Start from a particular offset - connect.cassandra.initial.offset
Increase or decrease the poll interval - connect.cassandra.import.poll.interval
Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration
The following CQL data types are supported:
\
This page describes the usage of the Stream Reactor FTP Source Connector.
Provide the remote directories and on specified intervals, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting them into Kafka.
For more examples see the .
Each Kafka record represents a file and has the following types.
The format of the keys is configurable through connect.ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with the name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.
The values of the records contain the body of the file as bytes.
The following rules are used.
Tailed files are only allowed to grow. Bytes that have been appended to it since the last inspection are yielded. Preceding bytes are not allowed to change;
Updated files can grow, shrink and change anywhere. The entire contents are yielded.
Instead of dumping whole file bodies (and the danger of exceeding Kafka’s message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:
The default object that is used is a pass-through converter, an instance of:
To override it, create your own implementation of SourceRecordConverter and place the jar in the plugin.path
.
This page describes the usage of the Stream Reactor MQTT Source Connector.
A Kafka Connect source connector to read events from MQTT and push them to Kafka.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.
This page describes the usage of the Stream Reactor AWS S3 Source Connector.
This connector is also available on the AWS Marketplace.
Files that have been archived to AWS Glacier storage class are skipped, in order to load these files you must manually restore the files. Skipped files are logged in the Connect workers log files.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The S3 source location is defined within the FROM clause. The connector will read all files from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the files in the bucket.
connect.s3.source.partition.extractor.regex=none
connect.s3.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read files containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.
Text: The connector will read files containing lines of text, each line representing a distinct record.
CSV: The connector will read files containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read files containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read files containing bytes, each file is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
The S3 sink employs zero-padding in file names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of files.
When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical file name order.
In such cases, to process files in the correct sequence, the source needs to list all files in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type
to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the files.
To limit the number of file names the source reads from S3 in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The AWS S3 Source Connector allows you to filter the files to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
.
The connect.s3.source.extension.excludes
property is a comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. For example, to exclude .txt
and .csv
files, you would set this property as follows:
The connect.s3.source.extension.includes
property is a comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. For example, to include only .json
and .xml
files, you would set this property as follows:
Note: If both connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
The connector can also be used against API compatible systems provided they implement the following:
This page describes the usage of the Stream Reactor GCP Storage Source Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The GCP Storage source location is defined within the FROM clause. The connector will read all files from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the files in the bucket.
connect.gcpstorage.source.partition.extractor.regex=none
connect.gcpstorage.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read files containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
Text: The connector will read files containing lines of text, each line representing a distinct record.
CSV: The connector will read files containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read files containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read files containing bytes, each file is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical file name order.
In such cases, to process files in the correct sequence, the source needs to list all files in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type
to LastModified
. This ensures that the source correctly arranges and processes the data based on the timestamps of the files.
To limit the number of file names the source reads from GCS in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The GCP Storage Source Connector allows you to filter the files to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
.
The connect.gcpstorage.source.extension.excludes
property is a comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. For example, to exclude .txt
and .csv
files, you would set this property as follows:
The connect.gcpstorage.source.extension.includes
property is a comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. For example, to include only .json
and .xml
files, you would set this property as follows:
Note: If both connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “File” mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:
When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.
When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.
To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type
property, which supports two options:
hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.
regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex
property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:
This page describes the usage of the Stream Reactor JMS Source Connector.
A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.
The connector uses the standard JMS protocols and has been tested against ActiveMQ.
The connector allows for the JMS initial.context.factory
and connection.factory
to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path
of the connector.
Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.
The schema of the messages is fixed and can be found under Data Types unless a converter is used.
You must provide the JMS implementation jars for your JMS service.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.
The connector supports converters to handle different message payload formats in the source topic or queue.
If no converter is provided the JMS message is converter to a Kafka Struct representation.
This page describes installing the Lenses Kafka Connectors.
If you do not use the plugin.path
and add the connectors directly to the CLASSPATH you may have dependency conflicts.
Download the release and unpack.
Within the unpacked directory you will find the following structure:
The libs directory contains all the Stream Reactor Connector jars. Edit your Connect worker properties add the path to the directory containing the connectors and restart your workers. Repeat this process for all the Connect workers in your cluster. The connectors must be available to all the workers.
Example:
This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.
A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.
In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info .
For more examples see the .
Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):
Connector allows for multiple KCQL commands.
The following KCQL is supported:
The selection of fields from the Event Hubs message is not supported.
As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.
You can connect to Azure EventHubs passing specific JAAS parameters in configuration.
The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings
and when user specifies a property with it, it will be automatically passed to the Consumer.
User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.
It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records
is passed directly to it)
There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:
client.id
- Connector sets it by itself
group.id
- Connector sets it by itself
key.deserializer
- Connector transitions bytes 1-to-1
value.deserializer
- Connector transitions bytes 1-to-1
enable.auto.commit
- connector automatically sets it to false
and checks what offsets are committed in output topic instead
Field Name | Schema Type | Description |
---|---|---|
Name | Description | Type | Default Value |
---|---|---|---|
Name | Description | Type | Default Value |
---|---|---|---|
Name | Description | Type | Default Value |
---|
When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
Field Name | Schema Type | Description |
---|
Field Name | Schema Type | Description |
---|
Name | Description | Type | Available Values | Default Value |
---|
For a more detailed explanation of how to use options.
CQL Type | Connect Data Type |
---|
Name | Description | Type | Default Value |
---|
To learn more examples of using the FTP Kafka connector read this
Name | Description | Type | Default Value |
---|
The connector supports converters to handle different messages payload format in the source topic. See .
Name | Description | Type | Default Value |
---|
For more examples see the .
S3 Storage Format | Kafka Output Format | Restore or replicate cluster | Analytics | Sink Converter | Source Converter |
---|
Name | Description | Type | Available Values |
---|
Name | Description | Type | Available Values | Default Value |
---|
S3 Storage Format | Kafka Output Format | Restore or replicate cluster | Analytics | Sink Converter | Source Converter |
---|
s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of files.
Name | Description | Type | Available Values |
---|
When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
Name | Description | Type | Available Values | Default Value |
---|
See .
Name | Type |
---|
Name | Description | Type | Default Value |
---|
Learn more about different methods of connecting to Event Hubs on . The only caveat is to add connector-specific prefix like in example above. See for more info.
Name | Description | Type | Default Value |
---|
MessageId
String
The message identifier that uniquely identifies the message and its payload.
Field Name
Schema Type
Description
deliveryCount
int64
The number of the times this message was delivered to clients.
enqueuedTimeUtc
int64
The time at which this message was enqueued in Azure Service Bus.
contentType
Optional String
The content type of this message.
label
Optional String
The application specific message label.
correlationId
Optional String
The correlation identifier.
messageProperties
Optional String
The map of user application properties of this message.
partitionKey
Optional String
The partition key for sending a message to a partitioned entity.
replyTo
Optional String
The address of an entity to send replies to.
replyToSessionId
Optional String
The session identifier augmenting the ReplyTo address.
deadLetterSource
Optional String
The name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLive
int64
The duration before this message expires.
lockedUntilUtc
Optional int64
The time when the lock of this message expires.
sequenceNumber
Optional int64
The unique number assigned to a message by Azure Service Bus.
sessionId
Optional String
The session identifier for a session-aware entity.
lockToken
Optional String
The lock token for the current message.
messageBody
Optional bytes
The body of this message as a byte array.
getTo
Optional String
The “to” address.
servicebus.type
Specifies Service Bus type: QUEUE
or TOPIC
string
subscription.name
Specifies subscription name if Service Bus type is TOPIC
string
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.source.task.records.queue.size
Specifies the Queue size between Service Bus Receivers and Kafka
int
20
batch.size | The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping). | int | 1000 |
cache.ttl | The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message. | long | 1 hour |
queue.max | Data is loaded into a queue asynchronously so that it stands ready when the | int | 10000 |
ProjectId | String | The Pub/Sub project containing the topic from which messages are polled. |
TopicId | String | The Pub/Sub topic containing the messages. |
SubscriptionId | String | The Pub/Sub subscription of the Pub/Sub topic. |
MessageId | String | A unique ID for a Pub/Sub message |
MessageData | Optional String | The body of the Pub/Sub message. |
AttributeMap | Optional String | The attribute map associated with the Pub/Sub message. |
TimeUUID | Optional String |
UUID | Optional String |
Inet | Optional String |
Ascii | Optional String |
Text | Optional String |
Timestamp | Optional String |
Date | Optional String |
Tuple | Optional String |
UDT | Optional String |
Boolean | Optional Boolean |
TinyInt | Optional Int8 |
SmallInt | Optional Int16 |
Int | Optional Int32 |
Decimal | Optional String |
Float | Optional Float32 |
Counter | Optional Int64 |
BigInt | Optional Int64 |
VarInt | Optional Int64 |
Double | Optional Int64 |
Time | Optional Int64 |
Blob | Optional Bytes |
Map | Optional [String -> MAP] |
List | Optional [String -> ARRAY] |
Set | Optional [String -> ARRAY] |
connect.cassandra.contact.points | Initial contact point host for Cassandra including port. | string | localhost |
connect.cassandra.port | Cassandra native port. | int | 9042 |
connect.cassandra.key.space | Keyspace to write to. | string |
connect.cassandra.username | Username to connect to Cassandra with. | string |
connect.cassandra.password | Password for the username to connect to Cassandra with. | password |
connect.cassandra.ssl.enabled | Secure Cassandra driver connection via SSL. | boolean | false |
connect.cassandra.trust.store.path | Path to the client Trust Store. | string |
connect.cassandra.trust.store.password | Password for the client Trust Store. | password |
connect.cassandra.trust.store.type | Type of the Trust Store, defaults to JKS | string | JKS |
connect.cassandra.key.store.type | Type of the Key Store, defauts to JKS | string | JKS |
connect.cassandra.ssl.client.cert.auth | Enable client certification authentication by Cassandra. Requires KeyStore options to be set. | boolean | false |
connect.cassandra.key.store.path | Path to the client Key Store. | string |
connect.cassandra.key.store.password | Password for the client Key Store | password |
connect.cassandra.consistency.level | Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be. | string |
connect.cassandra.fetch.size | The number of records the Cassandra driver will return at once. | int | 5000 |
connect.cassandra.load.balancing.policy | Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN | string | TOKEN_AWARE |
connect.cassandra.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them. | string | THROW |
connect.cassandra.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.cassandra.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.cassandra.task.buffer.size | The size of the queue as read writes to. | int | 10000 |
connect.cassandra.assigned.tables | The tables a task has been assigned. | string |
connect.cassandra.batch.size | The number of records the source task should drain from the reader queue. | int | 100 |
connect.cassandra.import.poll.interval | The polling interval between queries against tables for bulk mode. | long | 1000 |
connect.cassandra.time.slice.ms | The range of time in milliseconds the source task the timestamp/timeuuid will use for query | long | 10000 |
connect.cassandra.import.allow.filtering | Enable ALLOW FILTERING in incremental selects. | boolean | true |
connect.cassandra.slice.duration | Duration to query for in target Cassandra table. Used to restrict query timestamp span | long | 10000 |
connect.cassandra.slice.delay.ms | The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available | long | 30000 |
connect.cassandra.initial.offset | The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z | string | 1900-01-01 00:00:00.0000000Z |
connect.cassandra.mapping.collection.to.json | Mapping columns with type Map, List and Set like json | boolean | true |
connect.cassandra.kcql | KCQL expression describing field selection and routes. | string |
connect.ftp.address | host[:port] of the ftp server | string |
connect.ftp.user | Username to connect with | string |
connect.ftp.password | Password to connect with | string |
connect.ftp.refresh | iso8601 duration that the server is polled | string |
connect.ftp.file.maxage | iso8601 duration for how old files can be | string |
connect.ftp.keystyle | SourceRecord keystyle, string or struct | string |
connect.ftp.protocol | Protocol to use, FTP or FTPS | string | ftp |
connect.ftp.timeout | Ftp connection timeout in milliseconds | int | 30000 |
connect.ftp.filter | Regular expression to use when selecting files for processing | string | .* |
connect.ftp.monitor.tail | Comma separated lists of path:destinationtopic; tail of file to tracked | string |
connect.ftp.monitor.update | Comma separated lists of path:destinationtopic; whole file is tracked | string |
connect.ftp.monitor.slicesize | File slice size in bytes | int | -1 |
connect.ftp.fileconverter | File converter class | string | com.datamountaineer.streamreactor.connect.ftp.source.SimpleFileConverter |
connect.ftp.sourcerecordconverter | Source record converter class | string | com.datamountaineer.streamreactor.connect.ftp.source.NopSourceRecordConverter |
connect.ftp.max.poll.records | Max number of records returned per poll | int | 10000 |
connect.mqtt.hosts | Contains the MQTT connection end points. | string |
connect.mqtt.username | Contains the Mqtt connection user name | string |
connect.mqtt.password | Contains the Mqtt connection password | password |
connect.mqtt.service.quality | Specifies the Mqtt quality of service | int |
connect.mqtt.timeout | Provides the time interval to establish the mqtt connection | int | 3000 |
connect.mqtt.clean | connect.mqtt.clean | boolean | true |
connect.mqtt.keep.alive | The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message. | int | 5000 |
connect.mqtt.client.id | Contains the Mqtt session client id | string |
connect.mqtt.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.mqtt.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.mqtt.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.mqtt.retained.messages | Specifies the Mqtt retained flag. | boolean | false |
connect.mqtt.converter.throw.on.error | If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false. | boolean | false |
connect.converter.avro.schemas | If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter | string |
connect.mqtt.log.message | Logs received MQTT messages | boolean | false |
connect.mqtt.kcql | Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics | string |
connect.mqtt.polling.timeout | Provides the timeout to poll incoming messages | int | 1000 |
connect.mqtt.share.replicate | Replicate the shared subscriptions to all tasks instead of distributing them | boolean | false |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.mqtt.ssl.ca.cert | Provides the path to the CA certificate file to use with the Mqtt connection | string |
connect.mqtt.ssl.cert | Provides the path to the certificate file to use with the Mqtt connection | string |
connect.mqtt.ssl.key | Certificate private [config] key file path. | string |
connect.mqtt.process.duplicates | Process duplicate messages | boolean | false |
JSON | STRING | Same,Other | Yes, No | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | Yes | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | No | ByteArrayConverter | ByteArrayConverter |
JSON | JSON | Same,Other | Yes | JsonConverter | StringConverter |
JSON | JSON | Same,Other | No | StringConverter | StringConverter |
AVRO,Parquet | JSON | Same,Other | Yes,No | JsonConverter | JsonConverter or Avro Converter( Glue, Confluent) |
AVRO,Parquet, JSON | BYTES | Same,Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Same | Yes | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | AVRO | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Other | Yes,No | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | Yes | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | Protobuf | Other | Yes,No | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet, JSON | Other | Same, Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
read.text.mode | Controls how Text content is read | Enum | Regex, StartEndTag, StartEndLine |
read.text.regex | Regular Expression for Text Reading (if applicable) | String |
read.text.start.tag | Start Tag for Text Reading (if applicable) | String |
read.text.end.tag | End Tag for Text Reading (if applicable) | String |
read.text.buffer.size | Text Buffer Size (for optimization) | Int |
read.text.start.line | Start Line for Text Reading (if applicable) | String |
read.text.end.line | End Line for Text Reading (if applicable) | String |
read.text.trim | Trim Text During Reading | Boolean |
store.envelope | Messages are stored as “Envelope” | Boolean |
JSON | STRING | Same,Other | Yes, No | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | Yes | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | No | ByteArrayConverter | ByteArrayConverter |
JSON | JSON | Same,Other | Yes | JsonConverter | StringConverter |
JSON | JSON | Same,Other | No | StringConverter | StringConverter |
AVRO,Parquet | JSON | Same,Other | Yes,No | JsonConverter | JsonConverter or Avro Converter( Glue, Confluent) |
AVRO,Parquet, JSON | BYTES | Same,Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Same | Yes | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | AVRO | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Other | Yes,No | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | Yes | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | Protobuf | Other | Yes,No | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet, JSON | Other | Same, Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
read.text.mode | Controls how Text content is read | Enum | Regex, StartEndTag, StartEndLine |
read.text.regex | Regular Expression for Text Reading (if applicable) | String |
read.text.start.tag | Start Tag for Text Reading (if applicable) | String |
read.text.end.tag | End Tag for Text Reading (if applicable) | String |
read.text.buffer.size | Text Buffer Size (for optimization) | Int |
read.text.start.line | Start Line for Text Reading (if applicable) | String |
read.text.end.line | End Line for Text Reading (if applicable) | String |
read.text.trim | Trim Text During Reading | Boolean |
store.envelope | Messages are stored as “Envelope” | Boolean |
connect.gcpstorage.gcp.auth.mode | Specifies the authentication mode for connecting to GCP. | string | "Credentials", "File" or "Default" | "Default" |
connect.gcpstorage.gcp.credentials | For "auth.mode" credentials: GCP Authentication credentials string. | string | (Empty) |
connect.gcpstorage.gcp.file | For "auth.mode" file: Local file path for file containing GCP authentication credentials. | string | (Empty) |
connect.gcpstorage.gcp.project.id | GCP Project ID. | string | (Empty) |
connect.gcpstorage.gcp.quota.project.id | GCP Quota Project ID. | string | (Empty) |
connect.gcpstorage.endpoint | Endpoint for GCP Storage. | string |
connect.gcpstorage.error.policy | Defines the error handling policy when errors occur during data transfer to or from GCP Storage. | string | "NOOP," "THROW," "RETRY" | "THROW" |
connect.gcpstorage.max.retries | Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework. | int | 20 |
connect.gcpstorage.retry.interval | Specifies the interval (in milliseconds) between retry attempts by the connector. | int | 60000 |
connect.gcpstorage.http.max.retries | Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage. | long | 5 |
connect.gcpstorage.http.retry.interval | Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed. | long | 50 |
connect.gcpstorage.kcql | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour | string | [kcql configuration]({{< relref "#kcql-support" >}}) |
connect.gcpstorage.source.extension.excludes | A comma-separated list of file extensions to exclude from the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.gcpstorage.source.extension.includes | A comma-separated list of file extensions to include in the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.gcpstorage.source.partition.extractor.type | Type of Partition Extractor (Hierarchical or Regex) | string | hierarchical, regex |
connect.gcpstorage.source.partition.extractor.regex | Regex Pattern for Partition Extraction (if applicable) | string |
connect.gcpstorage.source.partition.search.continuous | If set to true the connector will continuously search for new partitions. | boolean | true, false | true |
connect.gcpstorage.source.partition.search.interval | The interval in milliseconds between searching for new partitions. | long | 300000 |
connect.gcpstorage.source.partition.search.excludes | A comma-separated list of paths to exclude from the partition search. | string | ".indexes" |
connect.gcpstorage.source.partition.search.recurse.levels | Controls how many levels deep to recurse when searching for new partitions | int | 0 |
connect.gcpstorage.ordering,type | Type of ordering for the GCS file names to ensure the processing order. | string | AlphaNumeric, LastModified | AlphaNumeric |
message_timestamp | Optional int64 |
correlation_id | Optional string |
redelivered | Optional boolean |
reply_to | Optional string |
destination | Optional string |
message_id | Optional string |
mode | Optional int32 |
type | Optional string |
priority | Optional int32 |
bytes_payload | Optional bytes |
properties | Map of string |
connect.jms.url | Provides the JMS broker url | string |
connect.jms.initial.context.factory | Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory | string |
connect.jms.connection.factory | Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory | string | ConnectionFactory |
connect.jms.kcql | connect.jms.kcql | string |
connect.jms.subscription.name | subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics | string |
connect.jms.password | Provides the password for the JMS connection | password |
connect.jms.username | Provides the user for the JMS connection | string |
connect.jms.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.jms.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.jms.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.jms.destination.selector | Selector to use for destination lookup. Either CDI or JNDI. | string | CDI |
connect.jms.initial.context.extra.params | List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp | list | [] |
connect.jms.batch.size | The number of records to poll for on the target JMS destination in each Connect poll. | int | 100 |
connect.jms.polling.timeout | Provides the timeout to poll incoming messages | long | 1000 |
connect.jms.source.default.converter | Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter | string |
connect.jms.converter.throw.on.error | If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false. | boolean | false |
connect.converter.avro.schemas | If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE | string |
connect.jms.headers | Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage" | string |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.jms.evict.interval.minutes | Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged. | int | 10 |
connect.jms.evict.threshold.minutes | The number of minutes after which an uncommitted entry becomes evictable from the connector cache. | int | 10 |
connect.jms.scale.type | How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max |
connect.eventhubs.source.connection.settings.bootstrap.servers | Specifies the Event Hubs server location. | string |
connect.eventhubs.source.close.timeout | Amount of time (in seconds) for Consumer to close. | int | 30 |
connect.eventhubs.source.default.offset | Specifies whether by default we should consume from earliest (default) or latest offset. | string | earliest |
connect.eventhubs.kcql | Comma-separated output KCQL queries | string |
This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.
Coming soon!
This page describes the usage of the Stream Reactor Google PubSub Sink Connector.
Coming soon!
connect.pubsub.gcp.auth.mode | Specifies the authentication mode for connecting to GCP. | string | Credentials, File or Default | Default |
connect.pubsub.gcp.credentials | For “auth.mode” credentials: GCP Authentication credentials string. | string | (Empty) |
connect.pubsub.gcp.file | For “auth.mode” file: Local file path for file containing GCP authentication credentials. | string | (Empty) |
connect.pubsub.gcp.project.id | GCP Project ID. | string | (Empty) |
connect.pubsub.kcql | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour | string |
connect.pubsub.output.mode | Default or Compatibility | Default |
connect.s3.aws.auth.mode | Specifies the AWS authentication mode for connecting to S3. | string | "Credentials," "Default" | "Default" |
connect.s3.aws.access.key | Access Key for AWS S3 Credentials | string |
connect.s3.aws.secret.key | Secret Key for AWS S3 Credentials | string |
connect.s3.aws.region | AWS Region for S3 Bucket | string |
connect.s3.pool.max.connections | Maximum Connections in the Connection Pool | int | -1 (undefined) | 50 |
connect.s3.custom.endpoint | Custom Endpoint URL for S3 (if applicable) | string |
connect.s3.kcql | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour | string |
connect.s3.vhost.bucket | Enable Virtual Hosted-style Buckets for S3 | boolean | true, false | false |
connect.s3.source.extension.excludes | A comma-separated list of file extensions to exclude from the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.s3.source.extension.includes | A comma-separated list of file extensions to include in the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.s3.source.partition.extractor.type | Type of Partition Extractor (Hierarchical or Regex) | string | hierarchical, regex |
connect.s3.source.partition.extractor.regex | Regex Pattern for Partition Extraction (if applicable) | string |
connect.s3.ordering.type | Type of ordering for the S3 file names to ensure the processing order. | string | AlphaNumeric, LastModified | AlphaNumeric |
connect.s3.source.partition.search.continuous | If set to true the connector will continuously search for new partitions. | boolean | true, false | true |
connect.s3.source.partition.search.excludes | A comma-separated list of paths to exclude from the partition search. | string | ".indexes" |
connect.s3.source.partition.search.interval | The interval in milliseconds between searching for new partitions. | long | 300000 |
connect.s3.source.partition.search.recurse.levels | Controls how many levels deep to recurse when searching for new partitions | int | 0 |
This page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.
Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Bus in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.
For more examples see the tutorials.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in #storage-to-output-matrix. Feel free to tweak the configuration to your requirements.
You can specify multiple KCQL statements separated by ;
to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Kafka topic of name <your-kafka-topic>
to Service Bus of name <your-service-bus>
using the PROPERTIES specified (please check #keyed-json-format for more info on necessary properties)
The selection of fields from the Service Bus message is not supported.
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the Azure Website.
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
In order to be writing to the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type
and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE
here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type
which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC
here and we need to pass it.
This is sufficient to enable you to create the mapping with your topic.
This sink supports the following Kafka payloads:
String Schema Key and Binary payload (then MessageId
in Service Bus is set with Kafka Key)
any other key (or keyless) and Binary payload (this causes Service Bus messages to not have specified MessageId
)
No Schema and JSON
Azure Service Bus doesn't allow to send messages with null content (payload)
Null Payload (sometimes referred as Kafka Tombstone) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren't allowed to send messages with null payload and we have to drop them instead.
Please keep that in mind when using Service Bus and designing business logic around null payloads!
Please find below all the necessary KCQL properties:
Please find below all the relevant configuration parameters:
This page describes the usage of the Stream Reactor AWS S3 Sink Connector.
This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to AWS S3 Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.
For more examples see the tutorials.
This example writes to a bucket called demo, partitioning by a field called ts
, store as JSON.
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The connector uses KCQL to map topics to S3 buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .
:
In this case, you can use the following KCQL statement:
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics
or topics.regex
property in the connector and ensure proper mapping to the KCQL statements.
Set the FROM clause to *. This will auto map the topic as a partition.
The object key serves as the filename used to store data in S3. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $bucket/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY
clause. The format is either $bucket/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension
(AWS Athena naming style mimicking Hive-like data partitioning) or $bucket/[$prefix]/customValue/topic(partition_offset).ext
. The extension is determined by the selected storage format.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of AWS S3 key length restrictions.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using __topic and __partition.
The partition clause works on header, key and values fields of the Kafka message.
To extract fields from the message values, simply use the field names in the PARTITIONBY
clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the S3 object key name:
Kafka message headers can also be used in the S3 object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure S3 object keys effectively.
To enable Athena-like partitioning, use the following syntax:
Storing data in Amazon S3 and partitioning it by time is a common practice in data management. For instance, you may want to organize your S3 data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY
clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp
. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY
clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
While the STOREAS
clause is optional, it plays a pivotal role in determining the storage format within AWS S3. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter
and value.converter
settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in S3. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope
property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in S3 is desired.
Storing the message Value Avro data as Parquet in S3:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in S3.
Enabling the full message stored as JSON in S3:
Enabling the full message stored as AVRO in S3:
If the restore (see the S3 Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the flush.count, flush.size, and flush.interval properties. The settings are optional and if not specified the defaults are:
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3_600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to S3). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The sink connector optimizes performance by padding the output files. This proves beneficial when using the S3 Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the S3 Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.
AVRO and Parquet offer the capability to compress files as they are written. The S3 Sink connector provides users with the flexibility to configure compression options. Here are the available options for the connect.s3.compression.codec
, along with indications of their support by Avro and Parquet writers:
Please note that not all compression libraries are bundled with the S3 connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
Here’s an example configuration for the Credentials mode:
For enhanced security and flexibility when using the Credentials mode, it is highly advisable to utilize Connect Secret Providers.
The connector supports Error policies.
The connector can also be used against API compatible systems provided they implement the following:
The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.
By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes
directory.
You can configure the root directory for these index files using the property connect.s3.indexes.name. This property specifies the path from the root of the S3 bucket. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.
This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.
A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by**;
** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert is the default write mode of the sink. It inserts messages from Kafka topics into DocumentDB.
The Sink supports DocumentDB upsert functionality which replaces the existing row if a match is found on the primary keys.
This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.
If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
The connector supports Error policies.
This page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.
This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to Azure Data Lake Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The connector uses KCQL to map topics to Datalake buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .
:
In this case, you can use the following KCQL statement:
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics
or topics.regex
property in the connector and ensure proper mapping to the KCQL statements.
Set the FROM clause to *. This will auto map the topic as a partition.
The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the Datalake Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the Datalake Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.
The object key serves as the filename used to store data in Datalake. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY
clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension
(naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext
. The extension is determined by the selected storage format.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using __topic and __partition.
The partition clause works on header, key and values fields of the Kafka message.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of Azure Datalake key length restrictions.
To extract fields from the message values, simply use the field names in the PARTITIONBY
clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the Datalake object key name:
Kafka message headers can also be used in the Datalake object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure Datalake object keys effectively.
To enable Athena-like partitioning, use the following syn
Storing data in Azure Datalake and partitioning it by time is a common practice in data management. For instance, you may want to organize your Datalake data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY
clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation here.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp
. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY
clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
While the STOREAS
clause is optional, it plays a pivotal role in determining the storage format within Azure Datalake. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter
and value.converter
settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in Datalake. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope
property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in Datalake is desired.
Storing the message Value Avro data as Parquet in Datalake:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in Datalake.
Enabling the full message stored as JSON in Datalake:
Enabling the full message stored as AVRO in Datalake:
If the restore (see the Datalake Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see #object-key section). The settings are optional and if not specified the defaults are:
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to Data Lake). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.
AVRO and Parquet offer the capability to compress files as they are written. The Datalake Sink connector provides advanced users with the flexibility to configure compression options. Here are the available options for the connect.datalake.compression.codec
, along with indications of their support by Avro and Parquet writers:
Please note that not all compression libraries are bundled with the Datalake connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
The connector offers two distinct authentication modes:
Default: This mode relies on the default Azure authentication chain, simplifying the authentication process.
Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with Azure.
Credentials: In this mode, explicit configuration of Azure Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “Connection String” mode:
For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers.
The connector supports Error policies.
The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.
By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes
directory.
You can configure the root directory for these index files using the property connect.datalake.indexes.name
. This property specifies the path from the root of the data lake filesystem. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.
This page details the configuration options for the Stream Reactor Kafka Connect sink connectors.
Sink connectors read data from Kafka and write to an external system.
Kafka topic retention policies determine how long a message is retained in a topic before it is deleted. If the retention period expires and the connector has not processed the messages, possibly due to not running or other issues, the unprocessed Kafka data will be deleted as per the retention policy. This can lead to significant data loss since the messages will no longer be available for the connector to sink to the target system.
Yes, the datalakes connectors natively support exactly-once guarantees.
Field names in Kafka message headers or values may contain dots (.
). To access these correctly, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
For field names with spaces or special characters, use a similar escaping strategy:
Field name with a space: `_value.'full name'`
Field name with special characters: `_value.'$special_characters!'`
This ensures the connector correctly extracts the intended fields and avoids parsing errors.
This page describes the usage of the Stream Reactor HTTP Sink Connector.
A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.
Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).
URL, header and content templating ability give you full control of the HTTP request.
Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.
For more examples see the .
The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.
If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:
When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.
In your configuration, in the content property of your config, you can define template substitutions like the following example:
(please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)
To collapse multiple messages into a single HTTP request you can use the multiple messaging template. This is automatic if the template has a messages
tag. See the below example:
Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.
Your connector configuration will look like this:
The final result will be HTTP requests with bodies like this:
When using simple and multiple message templating, the following are available:
URL including protocol (eg. http://lenses.io
). Template variables can be used.
Currently, the HTTP Sink supports either no authentication, BASIC HTTP authentication and OAuth2 authentication.
By default, no authentication is set. This can be also done by providing a configuration like this:
BASIC auth can be configured by providing a configuration like this:
OAuth auth can be configured by providing a configuration like this:
To customise the headers sent with your HTTP request you can supply a Headers List.
Example:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It's worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven't reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the batchCount
, batchSize
and `timeInterval properties. The settings are optional and if not specified the defaults are:
Some configuration examples follow on how to apply this connector to different message types.
These include converters, which are required to instruct Kafka Connect on how to read the source content.
In this case the converters are irrelevant as we are not using the message content to populate our message template.
The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.
Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.
The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.
Fields from the AVRO message are substituted into the message body in the following example:
Starting from version 8.1 as pilot release we give our customers ability to use functionality called Reporter which (if enabled) writes Success and Error processing reports to specified Kafka topic. Reports don't have key and you can find details about status in the message headers and value.
In order to enable this functionality we have to enable one (or both if we want full reporting) of the properties below:
This is most common scenario for on-premises Kafka Clusters used just for monitoring
This is more robust scenario when Connecting to external Kafka Cluster
This sink connector supports the following options as part of its configuration:
This page describes the usage of the Stream Reactor GCP Storage Sink Connector.
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
For more examples see the .
The connector uses KCQL to map topics to GCP Storage buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as Json can use fields contaiing .
:
In this case you can use the following KCQL statement:
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics
or topics.regex
property in the connector and ensure proper mapping to the KCQL statements.
Set the FROM clause to *. This will auto map the topic as a partition.
The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the GCP Storage Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the GCP Storage Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.
The object key serves as the filename used to store data in GCP Storage. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY
clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension
(GCP Athena naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext.
The extension is determined by the selected storage format.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using __topic and __partition.
The partition clause works on header, key and values fields of the Kafka message.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of GCP Storage key length restrictions.
To extract fields from the message values, simply use the field names in the PARTITIONBY
clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the GCP Storage object key name:
Kafka message headers can also be used in the GCP Storage object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure GCP Storage object keys effectively.
To enable Athena-like partitioning, use the following syntax:
Storing data in GCP Storage and partitioning it by time is a common practice in data management. For instance, you may want to organize your GCP Storage data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY
clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp
. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY
clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
While the STOREAS
clause is optional, it plays a pivotal role in determining the storage format within GCP Storage. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter
and value.converter
settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in GCP Storage. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope
property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in GCP Storage is desired.
Storing the message Value Avro data as Parquet in GCP Storage:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in GCP Storage.
Enabling the full message stored as JSON in GCP Storage:
Enabling the full message stored as AVRO in GCP Storage:
If the restore (see the GCP Storage Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to GCP Storage). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options. Here are the available options for the connect.gcpstorage.compression.codec
, along with indications of their support by Avro and Parquet writers:
Please note that not all compression libraries are bundled with the GCP Storage connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with GCP.
Credentials: In this mode, explicit configuration of GCP Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “Connection String” mode:
The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.
By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes
directory.
You can configure the root directory for these index files using the property connect.gcpstorage.indexes.name
. This property specifies the path from the root of the GCS bucket. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.
This page describes the usage of the Stream Reactor InfluxDB Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.
Only applicable to value fields. No support for nested fields, keys or topic metadata.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor Cassandra Sink Connector.
The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.
For more examples see the .
You can specify multiple KCQL statements separated by**;
** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.
Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:
If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:
Deletion will only occur if a message with an empty payload is received from Kafka.
Ensure your ordinal position of the connect.cassandra.delete.struct_flds
matches the binding order in the Cassandra delete statement!
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values
property in your KCQL with one of the possible values:
IGNORE
(ignores tombstones entirely)
FAIL
(throws Exception if tombstone happens)
DELETE
(deletes index with specified id)
Example:
The PK keyword can be used to specify the fields which will be used for the key value in Elastic. The field values will be concatenated and separated by a -
. If no fields are set the topic name, partition and message offset are used.
INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.
WITHDOCTYPE
allows you to associate a document type to the document inserted.
WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.
Example:
To use a static index name, define the target index in the KCQL statement without any prefixes:
This will consistently create an index named index_name
for any messages consumed from topicA
.
To extract an index name from a message header, use the _header
prefix followed by the header name:
This statement extracts the value from the gate
header field and uses it as the index name.
For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
In this case, the value of the header named prefix.abc.suffix
is used to form the index name.
To use the full value of the message key as the index name, use the _key
prefix:
For example, if the message key is "freddie"
, the resulting index name will be freddie
.
To extract an index name from a field within the message value, use the _value
prefix followed by the field name:
This example uses the value of the name
field from the message's value. If the field contains "jason"
, the index name will be jason
.
Nested Fields in Values
To access nested fields within a value, specify the full path using dot notation:
If the firstName
field is nested within the name
structure, its value (e.g., "hans"
) will be used as the index name.
Fields with Dots in Their Names
For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('
):
If the value structure contains:
The extracted index name will be hans
.
The Sink will automatically create missing indexes at startup.
Please note that this feature is not compatible with index names extracted from message headers/keys/values.
Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.
While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:
Data encryption: Prevents unauthorized access to data being transferred.
Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.
Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).
Truststore: Holds certificates to check if the node’s certificate is valid.
Keystore: Contains your client’s private key and certificate to prove your identity to the node.
SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.
Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.
This page describes the usage of the Stream Reactor JMS Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
The sink can write to either topics or queues, specified by the WITHTYPE clause.
When a message is sent to a JMS target it can be one of the following:
JSON - Send a TextMessage
AVRO - Send a BytesMessage
MAP - Send a MapMessage
OBJECT - Send an ObjectMessage
This is set by the WITHFORMAT keyword.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor MongoDB Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
Insert is the default write mode of the sink.
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
The BATCH clause controls the batching of writes to MongoDB.
TLS/SSL is supported by setting ?ssl=true in the connect.mongo.connection
option. The MongoDB driver will then attempt to load the truststore and keystore using the JVM system properties.
You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:
javax.net.ssl.trustStore: the path to a trust store containing the certificate of the signing authority
javax.net.ssl.trustStorePassword: the password to access this trust store
javax.net.ssl.keyStore: the path to a key store containing the client’s SSL certificates
javax.net.ssl.keyStorePassword: the password to access this key store
All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism
in the connect.mongo.connection
option.
The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism
option.
If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism
has precedence.
e.g.
List of fields that should be converted to ISO Date on MongoDB insertion (comma-separated field names), for JSON topics only. Field values may be an epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If the string does not parse to ISO, it will be written as a string instead.
Subdocument fields can be referred to in the following examples:
topLevelFieldName
topLevelSubDocument.FieldName
topLevelParent.subDocument.subDocument2.FieldName
If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.
This is controlled via the connect.mongo.json_datetime_fields
option.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor MQTT Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
The connector can dynamically write to MQTT topics determined by a field in the Kafka message value by using the WITHTARGET target clause and specifying $field
as the target field to extract.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
This page describes the usage of the Stream Reactor Redis Sink Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:
You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:
This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:
Composite keys are supported with the PK clause, a delimiter can be set with the optional configuration property connect.redis.pk.delimiter.
To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:
This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.
The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.
The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store in the sorted-sets. Set KCQL clause to define the filed using PK (primary key)
Notice we have dropped the INSERT clause.
The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:
This will create a key with names FX-USDGBP , FX-EURGBP etc.
The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.
To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:
To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:
To insert a message from a Kafka topic to a Redis PubSub use the following KCQL syntax:
The channel to write to in Redis is determined by a field in the payload of the Kafka message set in the KCQL statement, in this case, a field called myfield
.
This sink supports the following Kafka payloads:
Schema.Struct and Struct (Avro)
Schema.Struct and JSON
No Schema and JSON
Output mode. Please see documentation below.
Name | Description | Type | Default Value |
---|---|---|---|
Name | Description | Type | Default Value |
---|---|---|---|
Name | Description | Type | Available Values | Default Value |
---|---|---|---|---|
Compression | Avro Support | Avro (requires Level) | Parquet Support |
---|---|---|---|
Name | Description | Type | Available Values | Default Value |
---|---|---|---|---|
Name | Description | Type | Default Value |
---|---|---|---|
Name | Description | Type | Available Values | Default Value |
---|---|---|---|---|
Compression | Avro Support | Avro (requires Level) | Parquet Support |
---|---|---|---|
Name | Description | Type | Available Values | Default Value |
---|---|---|---|---|
Field | Usage Example |
---|
The URL is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.
Each header key and value is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.
Enabling SSL connections between Kafka Connect and HTTP Endpoint ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity. Please check out section in order to set it up.
Field | Default |
---|
Then we need to specify other connectivity properties just as we would when configuring Kafka Producer. Full configuration options can be found on and . Below you will be able to find two examples: one with local/plain configuration, other using SASL connection parameter.
Field | Type | Required | Values (Default) |
---|
Name | Description | Type | Available Values | Default Value |
---|
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation .
The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see section). The settings are optional and if not specified the defaults are:
Compression | Avro Support | Avro (requires Level) | Parquet Support |
---|
For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers . This approach ensures robust security practices while handling access credentials.
The connector supports .
Name | Description | Type | Available Values | Default Value |
---|
The connector supports .
Name | Description | Type | Default Value |
---|
The connector supports .
Name | Description | Type | Default Value |
---|
Name | Description | Type | Default Value |
---|
Name | Description | Type | Default Value |
---|
The connector supports .
Name | Description | Type | Default Value |
---|
The connector supports .
Name | Description | Type | Default Value |
---|
The connector supports .
Name | Description | Type | Default Value |
---|
The connector supports .
Name | Description | Type | Default Value |
---|
servicebus.type
Specifies Service Bus type: QUEUE
or TOPIC
string
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.sink.retries.max
Number of retries if message has failed to be delivered to Service Bus
int
3
connect.servicebus.sink.retries.timeout
Timeout (in milliseconds) between retries if message has failed to be delivered to Service Bus
int
500
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
padding.length.partition
Sets the padding length for the partition.
Int
0
padding.length.offset
Sets the padding length for the offset.
Int
12
partition.include.keys
Specifies whether partition keys are included.
Boolean
false Default (Custom Partitioning): true
store.envelope
Indicates whether to store the entire Kafka message
Boolean
store.envelope.fields.key
Indicates whether to store the envelope’s key.
Boolean
store.envelope.fields.headers
Indicates whether to store the envelope’s headers.
Boolean
store.envelope.fields.value
Indicates whether to store the envelope’s value.
Boolean
store.envelope.fields.metadata
Indicates whether to store the envelope’s metadata.
Boolean
UNCOMPRESSED
✅
✅
SNAPPY
✅
✅
GZIP
✅
LZ0
✅
LZ4
✅
BROTLI
✅
BZIP2
✅
ZSTD
✅
⚙️
✅
DEFLATE
✅
⚙️
XZ
✅
⚙️
Index Name (connect.s3.indexes.name
)
Resulting Indexes Directory Structure
Description
.indexes
(default)
.indexes/<connector_name>/
The default setup, where each connector uses its own subdirectory within .indexes
.
custom-indexes
custom-indexes/<connector_name>/
Custom root directory custom-indexes
, with a subdirectory for each connector.
indexes/s3-connector-logs
indexes/s3-connector-logs/<connector_name>/
Uses a custom subdirectory s3-connector-logs
within indexes
, with a subdirectory for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes
, with a subdirectory for each connector.
connect.s3.aws.auth.mode
Specifies the AWS authentication mode for connecting to S3.
string
“Credentials,” “Default”
“Default”
connect.s3.aws.access.key
The AWS Access Key used for authentication.
string
(Empty)
connect.s3.aws.secret.key
The AWS Secret Key used for authentication.
string
(Empty)
connect.s3.aws.region
The AWS Region where the S3 bucket is located.
string
(Empty)
connect.s3.pool.max.connections
Specifies the maximum number of connections allowed in the AWS Client’s HTTP connection pool when interacting with S3.
int
-1 (undefined)
50
connect.s3.custom.endpoint
Allows for the specification of a custom S3 endpoint URL if needed.
string
(Empty)
connect.s3.vhost.bucket
Enables the use of Vhost Buckets for S3 connections. Always set to true when custom endpoints are used.
boolean
true, false
false
connect.s3.error.policy
Defines the error handling policy when errors occur during data transfer to or from S3.
string
“NOOP,” “THROW,” “RETRY”
“THROW”
connect.s3.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.s3.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.s3.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with S3.
long
5
connect.s3.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.s3.local.tmp.directory
Enables the use of a local folder as a staging area for data transfer operations.
string
(Empty)
connect.s3.kcql
A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.
string
(Empty)
connect.s3.compression.codec
Sets the Parquet compression codec to be used when writing data to S3.
string
“UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”
“UNCOMPRESSED”
connect.s3.compression.level
Sets the compression level when compression is enabled for data transfer to S3.
int
1-9
(Empty)
connect.s3.seek.max.files
Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.
int
5
connect.s3.indexes.name
Configure the indexes root directory for this connector.
string
".indexes"
connect.s3.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.s3.schema.change.rollover
When set to false
, the file will not roll over upon receiving a record with a schema different from the accumulated ones. This is a performance optimization, intended only for cases where schema changes are backward-compatible.
boolean
true, false
true
connect.documentdb.endpoint
The Azure DocumentDb end point.
string
connect.documentdb.master.key
The connection master key
password
connect.documentdb.consistency.level
Determines the write visibility. There are four possible values: Strong,BoundedStaleness,Session or Eventual
string
Session
connect.documentdb.db
The Azure DocumentDb target database.
string
connect.documentdb.db.create
If set to true it will create the database if it doesn’t exist. If this is set to default(false) an exception will be raised.
boolean
false
connect.documentdb.proxy
Specifies the connection proxy details.
string
connect.documentdb.error.policy
Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.documentdb.max.retries
The maximum number of times to try the write again.
int
20
connect.documentdb.retry.interval
The time in milliseconds between retries.
int
60000
connect.documentdb.kcql
KCQL expression describing field selection and data routing to the target DocumentDb.
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
padding.type
Specifies the type of padding to be applied.
LeftPad, RightPad, NoOp
LeftPad, RightPad, NoOp
LeftPad
padding.char
Defines the character used for padding.
Char
‘0’
padding.length.partition
Sets the padding length for the partition.
Int
0
padding.length.offset
Sets the padding length for the offset.
Int
12
partition.include.keys
Specifies whether partition keys are included.
Boolean
false Default (Custom Partitioning): true
store.envelope
Indicates whether to store the entire Kafka message
Boolean
store.envelope.fields.key
Indicates whether to store the envelope’s key.
Boolean
store.envelope.fields.headers
Indicates whether to store the envelope’s headers.
Boolean
store.envelope.fields.value
Indicates whether to store the envelope’s value.
Boolean
store.envelope.fields..metadata
Indicates whether to store the envelope’s metadata.
Boolean
flush.size
Specifies the size (in bytes) for the flush operation.
Long
500000000 (500MB)
flush.count
Specifies the number of records for the flush operation.
Int
50000
flush.interval
Specifies the interval (in seconds) for the flush operation.
Long
3600 (1 hour)
UNCOMPRESSED
✅
✅
SNAPPY
✅
✅
GZIP
✅
LZ0
✅
LZ4
✅
BROTLI
✅
BZIP2
✅
ZSTD
✅
⚙️
✅
DEFLATE
✅
⚙️
XZ
✅
⚙️
Index Name (connect.datalake.indexes.name
)
Resulting Indexes Directory Structure
Description
.indexes
(default)
.indexes/<connector_name>/
The default setup, where each connector uses its own subdirectory within .indexes
.
custom-indexes
custom-indexes/<connector_name>/
Custom root directory custom-indexes
, with a subdirectory for each connector.
indexes/datalake-connector-logs
indexes/datalake-connector-logs/<connector_name>/
Uses a custom subdirectory datalake-connector-logs
within indexes
, with a subdirectory for each connector.
logs/indexes
logs/indexes/<connector_name>/
Indexes are stored under logs/indexes
, with a subdirectory for each connector.
connect.datalake.azure.auth.mode
Specifies the Azure authentication mode for connecting to Datalake.
string
“Credentials”, “ConnectionString” or “Default”
“Default”
connect.datalake.azure.account.key
The Azure Account Key used for authentication.
string
(Empty)
connect.datalake.azure.account.name
The Azure Account Name used for authentication.
string
(Empty)
connect.datalake.pool.max.connections
Specifies the maximum number of connections allowed in the Azure Client’s HTTP connection pool when interacting with Datalake.
int
-1 (undefined)
50
connect.datalake.endpoint
Datalake endpoint URL.
string
(Empty)
connect.datalake.error.policy
Defines the error handling policy when errors occur during data transfer to or from Datalake.
string
“NOOP,” “THROW,” “RETRY”
“THROW”
connect.datalake.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.datalake.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.datalake.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with Datalake.
long
5
connect.datalake.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.datalake.local.tmp.directory
Enables the use of a local folder as a staging area for data transfer operations.
string
(Empty)
connect.datalake.kcql
A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.
string
(Empty)
connect.datalake.compression.codec
Sets the Parquet compression codec to be used when writing data to Datalake.
string
“UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”
“UNCOMPRESSED”
connect.datalake.compression.level
Sets the compression level when compression is enabled for data transfer to Datalake.
int
1-9
(Empty)
connect.datalake.seek.max.files
Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.
int
5
connect.datalake.indexes.name
Configure the indexes root directory for this connector.
string
".indexes"
connect.datalake.exactly.once.enable
By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management
boolean
true, false
true
connect.datalake.schema.change.rollover
When set to false
, the file will not roll over upon receiving a record with a schema different from the accumulated ones. This is a performance optimization, intended only for cases where schema changes are backward-compatible.
boolean
true,false
true
MQTT
Sink data from Kafka to MQTT.
AWS S3
Sink data from Kafka to AWS S3 including backing up topics and offsets.
Azure CosmosDB
Sink data from Kafka to Azure CosmosDB.
Azure Data Lake Gen2
Sink data from Kafka to Azure Data Lake Gen2 including backing up topics and offsets.
Azure Event Bus
Load data from Azure Event Hubs into Kafka topics.
Azure Service Bus
Sink data from Kafka to Azure Service Bus topics and queues.
Cassandra
Sink data from Kafka to Cassandra.
Elasticsearch
Sink data from Kafka to Elasticsearch.
GCP PubSub
Sink data from Kafka to GCP PubSub.
GCP Storage
Sink data from Kafka to GCP Storage.
HTTP Sink
Sink data from Kafka to a HTTP endpoint.
InfluxDB
Sink data from Kafka to InfluxDB.
JMS
Sink data from Kafka to JMS.
MongoDB
Sink data from Kafka to MongoDB.
Redis
Sink data from Kafka to Redis.
Header | {{header.correlation-id}} |
Value | {{value}} |
{{value.product.id}} |
Key | {{key}} |
{{key.customer.number}} |
Topic | {{topic}} |
Partition | {{partition}} |
Offset | {{offset}} |
Timestamp | {{timestamp}} |
batchCount | 50_000 records |
batchSize | 500000000 (500MB) |
timeInterval | 3_600 seconds (1 hour) |
Property Name | Description |
| Path to the truststore file containing the trusted CA certificates for verifying broker certificates. |
| Password for the truststore file to protect its integrity. |
| Type of the truststore (e.g., |
| Path to the keystore file containing the client’s private key and certificate chain for client authentication. |
| Password for the keystore to protect the private key. |
| Type of the keystore (e.g., |
| The SSL protocol used for secure connections (e.g., |
| Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
| Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
Property Name | Description |
| Specifies whether the reporter is enabled. |
| A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled. |
| Specifies the topic for Reporter to write to. |
| SASL Mechanism used when connecting. |
| JAAS login context parameters for SASL connections in the format used by JAAS configuration files. |
| SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. |
Property Name | Description |
| Specifies whether the reporter is enabled. |
| A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled. |
| Specifies the topic for Reporter to write to. |
| SASL Mechanism used when connecting. |
| JAAS login context parameters for SASL connections in the format used by JAAS configuration files. |
| SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. |
padding.type | Specifies the type of padding to be applied. | LeftPad, RightPad, NoOp | LeftPad, RightPad, NoOp | LeftPad |
padding.char | Defines the character used for padding. | Char | ‘0’ |
padding.length.partition | Sets the padding length for the partition. | Int | 0 |
padding.length.offset | Sets the padding length for the offset. | Int | 12 |
partition.include.keys | Specifies whether partition keys are included. | Boolean | false Default (Custom Partitioning): true |
store.envelope | Indicates whether to store the entire Kafka message | Boolean |
store.envelope.fields.key | Indicates whether to store the envelope’s key. | Boolean |
store.envelope.fields.headers | Indicates whether to store the envelope’s headers. | Boolean |
store.envelope.fiels.value | Indicates whether to store the envelope’s value. | Boolean |
store.envelope.fields.metadata | Indicates whether to store the envelope’s metadata. | Boolean |
flush.size | Specifies the size (in bytes) for the flush operation. | Long | 500000000 (500MB) |
flush.count | Specifies the number of records for the flush operation. | Int | 50000 |
flush.interval | Specifies the interval (in seconds) for the flush operation. | Long | 3600 (1 hour) |
UNCOMPRESSED | ✅ | ✅ |
SNAPPY | ✅ | ✅ |
GZIP | ✅ |
LZ0 | ✅ |
LZ4 | ✅ |
BROTLI | ✅ |
BZIP2 | ✅ |
ZSTD | ✅ | ⚙️ | ✅ |
DEFLATE | ✅ | ⚙️ |
XZ | ✅ | ⚙️ |
Index Name ( | Resulting Indexes Directory Structure | Description |
|
| The default setup, where each connector uses its own subdirectory within |
|
| Custom root directory |
|
| Uses a custom subdirectory |
|
| Indexes are stored under |
connect.gcpstorage.gcp.auth.mode | Specifies the authentication mode for connecting to GCP. | string | "Credentials", "File" or "Default" | "Default" |
connect.gcpstorage.gcp.credentials | For "auth.mode" credentials: GCP Authentication credentials string. | string | (Empty) |
connect.gcpstorage.gcp.file | For "auth.mode" file: Local file path for file containing GCP authentication credentials. | string | (Empty) |
connect.gcpstorage.gcp.project.id | GCP Project ID. | string | (Empty) |
connect.gcpstorage.gcp.quota.project.id | GCP Quota Project ID. | string | (Empty) |
connect.gcpstorage.endpoint | Endpoint for GCP Storage. | string | (Empty) |
connect.gcpstorage.error.policy | Defines the error handling policy when errors occur during data transfer to or from GCP Storage. | string | "NOOP", "THROW", "RETRY" | "THROW" |
connect.gcpstorage.max.retries | Sets the maximum number of retries the connector will attempt before reporting an error. | int | 20 |
connect.gcpstorage.retry.interval | Specifies the interval (in milliseconds) between retry attempts by the connector. | int | 60000 |
connect.gcpstorage.http.max.retries | Sets the maximum number of retries for the underlying HTTP client when interacting with GCP. | long | 5 |
connect.gcpstorage.http.retry.interval | Specifies the retry interval (in milliseconds) for the underlying HTTP client. | long | 50 |
connect.gcpstorage.http.retry.timeout.multiplier | Specifies the change in delay before the next retry or poll | double | 3.0 |
connect.gcpstorage.local.tmp.directory | Enables the use of a local folder as a staging area for data transfer operations. | string | (Empty) |
connect.gcpstorage.kcql | A SQL-like configuration that defines the behavior of the connector. | string | (Empty) |
connect.gcpstorage.compression.codec | Sets the Parquet compression codec to be used when writing data to GCP Storage. | string | "UNCOMPRESSED", "SNAPPY", "GZIP", "LZ0", "LZ4", "BROTLI", "BZIP2", "ZSTD", "DEFLATE", "XZ" | "UNCOMPRESSED" |
connect.gcpstorage.compression.level | Sets the compression level when compression is enabled for data transfer to GCP Storage. | int | 1-9 | (Empty) |
connect.gcpstorage.seek.max.files | Specifies the maximum threshold for the number of files the connector uses. | int | 5 |
connect.gcpstorage.indexes.name | Configure the indexes root directory for this connector. | string | ".indexes" |
connect.gcpstorage.exactly.once.enable | By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management | boolean | true, false | true |
connect.gcpstorage.schema.change.rollover | When set to | boolean | true,false | true |
connect.influx.url | The InfluxDB database url. | string |
connect.influx.db | The database to store the values to. | string |
connect.influx.username | The user to connect to the influx database | string |
connect.influx.password | The password for the influxdb user. | password |
connect.influx.kcql | KCQL expression describing field selection and target measurements. | string |
connect.progress.enabled | Enables the output for how many records have been processed by the connector | boolean | false |
connect.influx.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.influx.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.influx.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.influx.retention.policy | Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is | string | autogen |
connect.influx.consistency.level | Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL. | string | ALL |
connect.cassandra.contact.points | Initial contact point host for Cassandra including port. | string | localhost |
connect.cassandra.port | Cassandra native port. | int | 9042 |
connect.cassandra.key.space | Keyspace to write to. | string |
connect.cassandra.username | Username to connect to Cassandra with. | string |
connect.cassandra.password | Password for the username to connect to Cassandra with. | password |
connect.cassandra.ssl.enabled | Secure Cassandra driver connection via SSL. | boolean | false |
connect.cassandra.trust.store.path | Path to the client Trust Store. | string |
connect.cassandra.trust.store.password | Password for the client Trust Store. | password |
connect.cassandra.trust.store.type | Type of the Trust Store, defaults to JKS | string | JKS |
connect.cassandra.key.store.type | Type of the Key Store, defauts to JKS | string | JKS |
connect.cassandra.ssl.client.cert.auth | Enable client certification authentication by Cassandra. Requires KeyStore options to be set. | boolean | false |
connect.cassandra.key.store.path | Path to the client Key Store. | string |
connect.cassandra.key.store.password | Password for the client Key Store | password |
connect.cassandra.consistency.level | Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be. | string |
connect.cassandra.fetch.size | The number of records the Cassandra driver will return at once. | int | 5000 |
connect.cassandra.load.balancing.policy | Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN | string | TOKEN_AWARE |
connect.cassandra.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them. | string | THROW |
connect.cassandra.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.cassandra.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.cassandra.threadpool.size | The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool. | int | 0 |
connect.cassandra.delete.struct_flds | Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type. | list | [] |
connect.cassandra.delete.statement | Delete statement for cassandra | string |
connect.cassandra.kcql | KCQL expression describing field selection and routes. | string |
connect.cassandra.default.value | By default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved. | string |
connect.cassandra.delete.enabled | Enables row deletion from cassandra | boolean | false |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.elastic.protocol | URL protocol (http, https) | string | http |
connect.elastic.hosts | List of hostnames for Elastic Search cluster node, not including protocol or port. | string | localhost |
connect.elastic.port | Port on which Elastic Search node listens on | string | 9300 |
connect.elastic.tableprefix | Table prefix (optional) | string |
connect.elastic.cluster.name | Name of the elastic search cluster, used in local mode for setting the connection | string | elasticsearch |
connect.elastic.write.timeout | The time to wait in millis. Default is 5 minutes. | int | 300000 |
connect.elastic.batch.size | How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once | int | 4000 |
connect.elastic.use.http.username | Username if HTTP Basic Auth required default is null. | string |
connect.elastic.use.http.password | Password if HTTP Basic Auth required default is null. | string |
connect.elastic.error.policy | Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.elastic.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.elastic.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.elastic.kcql | KCQL expression describing field selection and routes. | string |
connect.elastic.pk.separator | Separator used when have more that one field in PK | string | - |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
behavior.on.null.values | Specifies behavior on Kafka tombstones: | String | IGNORE |
Property Name | Description |
| Path to the truststore file containing the trusted CA certificates for verifying broker certificates. |
| Password for the truststore file to protect its integrity. |
| Type of the truststore (e.g., |
| Path to the keystore file containing the client’s private key and certificate chain for client authentication. |
| Password for the keystore to protect the private key. |
| Type of the keystore (e.g., |
| The SSL protocol used for secure connections (e.g., |
| Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
| Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine. |
connect.jms.url | Provides the JMS broker url | string |
connect.jms.initial.context.factory | Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory | string |
connect.jms.connection.factory | Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory | string | ConnectionFactory |
connect.jms.kcql | connect.jms.kcql | string |
connect.jms.subscription.name | subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics | string |
connect.jms.password | Provides the password for the JMS connection | password |
connect.jms.username | Provides the user for the JMS connection | string |
connect.jms.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.jms.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.jms.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.jms.destination.selector | Selector to use for destination lookup. Either CDI or JNDI. | string | CDI |
connect.jms.initial.context.extra.params | List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp | list | [] |
connect.jms.batch.size | The number of records to poll for on the target JMS destination in each Connect poll. | int | 100 |
connect.jms.polling.timeout | Provides the timeout to poll incoming messages | long | 1000 |
connect.jms.source.default.converter | Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter | string |
connect.jms.converter.throw.on.error | If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false. | boolean | false |
connect.converter.avro.schemas | If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE | string |
connect.jms.headers | Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage" | string |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.jms.evict.interval.minutes | Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged. | int | 10 |
connect.jms.evict.threshold.minutes | The number of minutes after which an uncommitted entry becomes evictable from the connector cache. | int | 10 |
connect.jms.scale.type | How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max |
ssl.cipher.suites | A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. | list |
ssl.enabled.protocols | The list of protocols enabled for SSL connections. | list | [TLSv1.2, TLSv1.1, TLSv1] |
ssl.keystore.password | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. | password |
ssl.key.password | The password of the private key in the key store file. This is optional for client. | password |
ssl.keystore.type | The file format of the key store file. This is optional for client. | string | JKS |
ssl.truststore.location | The location of the trust store file. | string |
ssl.endpoint.identification.algorithm | The endpoint identification algorithm to validate server hostname using server certificate. | string | https |
ssl.protocol | The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. | string | TLS |
ssl.trustmanager.algorithm | The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. | string | PKIX |
ssl.secure.random.implementation | The SecureRandom PRNG implementation to use for SSL cryptography operations. | string |
ssl.truststore.type | The file format of the trust store file. | string | JKS |
ssl.keymanager.algorithm | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. | string | SunX509 |
ssl.provider | The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. | string |
ssl.keystore.location | The location of the key store file. This is optional for client and can be used for two-way authentication for client. | string |
ssl.truststore.password | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. | password |
connect.mongo.connection | The mongodb connection in the format mongodb://[username:password@]host1[:port1],host2[:port2],…[,hostN[:portN]]][/[database][?options]]. | string |
connect.mongo.db | The mongodb target database. | string |
connect.mongo.username | The username to use when authenticating | string |
connect.mongo.password | The password for the use when authenticating | password |
connect.mongo.auth.mechanism | String | string | SCRAM-SHA-1 |
connect.mongo.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.mongo.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.mongo.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.mongo.kcql | KCQL expression describing field selection and data routing to the target mongo db. | string |
connect.mongo.json_datetime_fields | List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead. Subdocument fields can be referred to as in the following examples: “topLevelFieldName”, “topLevelSubDocument.FieldName”, “topLevelParent.subDocument.subDocument2.FieldName”, (etc.) If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate. | list | [] |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.mqtt.hosts | Contains the MQTT connection end points. | string |
connect.mqtt.username | Contains the Mqtt connection user name | string |
connect.mqtt.password | Contains the Mqtt connection password | password |
connect.mqtt.service.quality | Specifies the Mqtt quality of service | int |
connect.mqtt.timeout | Provides the time interval to establish the mqtt connection | int | 3000 |
connect.mqtt.clean | connect.mqtt.clean | boolean | true |
connect.mqtt.keep.alive | The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message. | int | 5000 |
connect.mqtt.client.id | Contains the Mqtt session client id | string |
connect.mqtt.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.mqtt.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.mqtt.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.mqtt.retained.messages | Specifies the Mqtt retained flag. | boolean | false |
connect.mqtt.converter.throw.on.error | If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false. | boolean | false |
connect.converter.avro.schemas | If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter | string |
connect.mqtt.kcql | Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics | string |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.mqtt.ssl.ca.cert | Provides the path to the CA certificate file to use with the Mqtt connection | string |
connect.mqtt.ssl.cert | Provides the path to the certificate file to use with the Mqtt connection | string |
connect.mqtt.ssl.key | Certificate private [config] key file path. | string |
connect.redis.pk.delimiter | Specifies the redis primary key delimiter | string | . |
ssl.provider | The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. | string |
ssl.protocol | The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. | string | TLS |
ssl.truststore.location | The location of the trust store file. | string |
ssl.keystore.password | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. | password |
ssl.keystore.location | The location of the key store file. This is optional for client and can be used for two-way authentication for client. | string |
ssl.truststore.password | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. | password |
ssl.keymanager.algorithm | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. | string | SunX509 |
ssl.trustmanager.algorithm | The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. | string | PKIX |
ssl.keystore.type | The file format of the key store file. This is optional for client. | string | JKS |
ssl.cipher.suites | A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. | list |
ssl.endpoint.identification.algorithm | The endpoint identification algorithm to validate server hostname using server certificate. | string | https |
ssl.truststore.type | The file format of the trust store file. | string | JKS |
ssl.enabled.protocols | The list of protocols enabled for SSL connections. | list | [TLSv1.2, TLSv1.1, TLSv1] |
ssl.key.password | The password of the private key in the key store file. This is optional for client. | password |
ssl.secure.random.implementation | The SecureRandom PRNG implementation to use for SSL cryptography operations. | string |
connect.redis.kcql | KCQL expression describing field selection and routes. | string |
connect.redis.host | Specifies the redis server | string |
connect.redis.port | Specifies the redis connection port | int |
connect.redis.password | Provides the password for the redis connection. | password |
connect.redis.ssl.enabled | Enables ssl for the redis connection | boolean | false |
connect.redis.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.redis.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.redis.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
| HttpMethod | Yes | POST, PUT, PATCH |
| String | Yes |
| String | Yes |
| Authentication | No |
| List[String] | No |
| Int | No |
| Int | No |
| Int | No | The time interval in milliseconds to wait before sending the request |
| Int | No | Upload Sync Period (100) - polling time period for uploads in milliseconds |
| Int | No | The number of errors to tolerate before failing the sink (5) |
| List[String] | No | The status codes to retry on (408,429,500,502,5003,504) |
| Int | No | The maximum number of retries to attempt (5) |
| Int | No | The maximum time in milliseconds to retry a request. Backoff is used to increase the time between retries, up to this maximum (30000) |
| Int | No | The HTTP connection timeout in milliseconds (10000) |
int | No | For each processed topic, the connector maintains an internal queue. This value specifies the maximum number of entries allowed in the queue before the enqueue operation blocks. The default is 100,000. |
int | No | The maximum time window, specified in milliseconds, to wait for the internal queue to accept new records. The default is 2 minutes |
(none)
The number of records to batch before sending the request, see
The size of the batch in bytes before sending the request, see