Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes the usage of the Stream Reactor Azure Data Lake Gen2 Source Connector.
Coming soon!
This page details the configuration options for the Stream Reactor Kafka Connect source connectors.
Source connectors read data from external systems and write to Kafka.
This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.
This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.
For more examples see the tutorials.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.
You can specify multiple KCQL statements separated by ;
to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Service Bus of name <your-service-bus>
to Kafka topic of name <your-kafka-topic>
using the PROPERTIES specified.
The selection of fields from the Service Bus message is not supported.
Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.
MessageId
String
The message identifier that uniquely identifies the message and its payload.
Field Name
Schema Type
Description
deliveryCount
int64
The number of the times this message was delivered to clients.
enqueuedTimeUtc
int64
The time at which this message was enqueued in Azure Service Bus.
contentType
Optional String
The content type of this message.
label
Optional String
The application specific message label.
correlationId
Optional String
The correlation identifier.
messageProperties
Optional String
The map of user application properties of this message.
partitionKey
Optional String
The partition key for sending a message to a partitioned entity.
replyTo
Optional String
The address of an entity to send replies to.
replyToSessionId
Optional String
The session identifier augmenting the ReplyTo address.
deadLetterSource
Optional String
The name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLive
int64
The duration before this message expires.
lockedUntilUtc
Optional int64
The time when the lock of this message expires.
sequenceNumber
Optional int64
The unique number assigned to a message by Azure Service Bus.
sessionId
Optional String
The session identifier for a session-aware entity.
lockToken
Optional String
The lock token for the current message.
messageBody
Optional bytes
The body of this message as a byte array.
getTo
Optional String
The “to” address.
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the Azure Website.
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type
and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE
here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type
which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC
here and we need to pass it.
Parameter subscription.name
which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.
Make sure your subscription exists otherwise you will get a similar error to this
Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity 'streamreactor:Topic:my-topic|lenses' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions.
Create the subscription per topic in the Azure Portal.
This is sufficient to enable you to create the mapping with your topic.
Please find below all the necessary KCQL properties:
servicebus.type
Specifies Service Bus type: QUEUE
or TOPIC
string
subscription.name
Specifies subscription name if Service Bus type is TOPIC
string
Please find below all the relevant configuration parameters:
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.source.task.records.queue.size
Specifies the Queue size between Service Bus Receivers and Kafka
int
5000
connect.servicebus.source.sleep.on.empty.poll.ms
The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.
long
250
connect.servicebus.source.complete.retries.max
The maximum number of retries to complete a message.
int
3
connect.servicebus.source.complete.retries.min.backoff.ms
The minimum duration in milliseconds for the first backoff
long
1000
connect.servicebus.source.prefetch.count
The number of messages to prefetch from the Azure Service Bus.
int
2000
This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.
A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.
In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info here.
For more examples see the tutorials.
Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):
Connector allows for multiple KCQL commands.
The following KCQL is supported:
The selection of fields from the Event Hubs message is not supported.
As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.
You can connect to Azure EventHubs passing specific JAAS parameters in configuration.
Learn more about different methods of connecting to Event Hubs on Azure website. The only caveat is to add connector-specific prefix like in example above. See #keyed-json-format for more info.
The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings
and when user specifies a property with it, it will be automatically passed to the Consumer.
User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.
It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records
is passed directly to it)
There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:
client.id
- Connector sets it by itself
group.id
- Connector sets it by itself
key.deserializer
- Connector transitions bytes 1-to-1
value.deserializer
- Connector transitions bytes 1-to-1
enable.auto.commit
- connector automatically sets it to false
and checks what offsets are committed in output topic instead
connect.eventhubs.source.connection.settings.bootstrap.servers
Specifies the Event Hubs server location.
string
connect.eventhubs.source.close.timeout
Amount of time (in seconds) for Consumer to close.
int
30
connect.eventhubs.source.default.offset
Specifies whether by default we should consume from earliest (default) or latest offset.
string
earliest
connect.eventhubs.kcql
Comma-separated output KCQL queries
string
This page describes the usage of the Stream Reactor Cassandra Source Connector.
Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
Examples:
The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
Keying is only supported in conjunction with the WITHFORMAT JSON clause
This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:
TIMESTAMP
TIMEUUID
TOKEN
DSESEARCHTIMESTAMP
If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.
You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.
DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.
The connector constantly loads the entire table.
The connector can be configured to:
Start from a particular offset - connect.cassandra.initial.offset
Increase or decrease the poll interval - connect.cassandra.import.poll.interval
Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration
The following CQL data types are supported:
\
This page describes the usage of the Stream Reactor AWS S3 Source Connector.
This connector is also available on the AWS Marketplace.
Objects that have been archived to AWS Glacier storage class are skipped, in order to load these objects you must manually restore the objects. Skipped objects are logged in the Connect workers log files.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The S3 source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified
sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.
connect.s3.source.partition.extractor.regex=none
connect.s3.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read objects containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.
Text: The connector will read objects containing lines of text, each line representing a distinct record.
CSV: The connector will read objects containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
The S3 sink employs zero-padding in object names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of object.
When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical object key name order.
In such cases, to process object in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type
to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.
If using LastModified
sorting, ensure objects do not arrive late, or use a post-processing step to handle them.
To limit the number of object keys the source reads from S3 in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The AWS S3 Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
.
The connect.s3.source.extension.excludes
property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt
and .csv
objects, you would set this property as follows:
The connect.s3.source.extension.includes
property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json
and .xml
objects, you would set this property as follows:
Note: If both connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving files to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.
Deleting Objects After Processing
For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.
Example:
Result: Objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.
Moving Objects to an Archive Bucket
To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.
Example:
Result: Objects are transferred to an archive-bucket, stored with an updated path that includes the processed/
prefix, maintaining an organized archive structure.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
The connector can also be used against API compatible systems provided they implement the following:
This page describes the usage of the Stream Reactor FTP Source Connector.
Provide the remote directories and on specified intervals, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting them into Kafka.
For more examples see the .
Each Kafka record represents a file and has the following types.
The format of the keys is configurable through connect.ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with the name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.
The values of the records contain the body of the file as bytes.
The following rules are used.
Tailed files are only allowed to grow. Bytes that have been appended to it since the last inspection are yielded. Preceding bytes are not allowed to change;
Updated files can grow, shrink and change anywhere. The entire contents are yielded.
Instead of dumping whole file bodies (and the danger of exceeding Kafka’s message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:
The default object that is used is a pass-through converter, an instance of:
To override it, create your own implementation of SourceRecordConverter and place the jar in the plugin.path
.
This page describes the usage of the Stream Reactor JMS Source Connector.
A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.
The connector uses the standard JMS protocols and has been tested against ActiveMQ.
The connector allows for the JMS initial.context.factory
and connection.factory
to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path
of the connector.
Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.
The schema of the messages is fixed and can be found under Data Types unless a converter is used.
You must provide the JMS implementation jars for your JMS service.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.
The connector supports converters to handle different message payload formats in the source topic or queue.
If no converter is provided the JMS message is converter to a Kafka Struct representation.
This page describes the usage of the Stream Reactor Google PubSub Source Connector.
The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).
For more examples see the .
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The source and target of the data are specified via the INSERT INTO... SELECT * FROM
clause. The connector will write all the records to the given topic, from the given subscription:
The PROPERTIES
clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').
Properties can be defined in any order.
The following properties are supported:
The connector offers three distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.
Here's an example configuration for the "Credentials" mode:
And here is an example configuration using the "File" mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.
Two modes are available: Default Mode and Compatibility Mode.
Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.
You can choose whichever suits your requirements.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: A String of the Pub/Sub MessageID.
Kafka Value: The Pub/Sub message value as BYTES.
Kafka Headers: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.
The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.
The Kafka Value is mapped from the body of the Pub/Sub message.
The Kafka Headers include:
PublishTimestamp: Long value representing the time when the Pub/Sub message was published, in seconds.
GCPProjectID: The GCP Project
PubSubTopicID: The Pub/Sub Topic ID.
PubSubSubscriptionID: The Pub/Sub Subscription ID.
All Pub/Sub message attributes: Each attribute from the Pub/Sub message is mapped as a separate header.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.
Kafka Value: Contains the message data and attributes from the Pub/Sub message.
The Key is a structure with these fields:
The Value is a structure with these fields:
This page describes the usage of the Stream Reactor GCP Storage Source Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The GCP Storage source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified
sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.
connect.gcpstorage.source.partition.extractor.regex=none
connect.gcpstorage.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read objects containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
Text: The connector will read objects containing lines of text, each line representing a distinct record.
CSV: The connector will read objects containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical object name order.
In such cases, to process objects in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type
to LastModified
. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.
If using LastModified
sorting, ensure objects do not arrive late, or use a post-processing step to handle them.
To limit the number of object names the source reads from GCS in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The GCP Storage Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
.
The connect.gcpstorage.source.extension.excludes
property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt
and .csv
objects, you would set this property as follows:
The connect.gcpstorage.source.extension.includes
property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json
and .xml
objects, you would set this property as follows:
Note: If both connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving objects to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.
Deleting objects After Processing
For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.
Example:
Result: objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.
Moving objects to an Archive Bucket
To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.
Example:
Result: objects are transferred to an archive-bucket, stored with an updated path that includes the processed/
prefix, maintaining an organized archive structure.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “File” mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:
When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.
When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.
To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type
property, which supports two options:
hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.
regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex
property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:
For a more detailed explanation of how to use options.
For more examples see the .
To learn more examples of using the FTP Kafka connector read this
See .
When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
[^3]s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of objects.
When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
TimeUUID
Optional String
UUID
Optional String
Inet
Optional String
Ascii
Optional String
Text
Optional String
Timestamp
Optional String
Date
Optional String
Tuple
Optional String
UDT
Optional String
Boolean
Optional Boolean
TinyInt
Optional Int8
SmallInt
Optional Int16
Int
Optional Int32
Decimal
Optional String
Float
Optional Float32
Counter
Optional Int64
BigInt
Optional Int64
VarInt
Optional Int64
Double
Optional Int64
Time
Optional Int64
Blob
Optional Bytes
Map
Optional [String -> MAP]
List
Optional [String -> ARRAY]
Set
Optional [String -> ARRAY]
connect.cassandra.contact.points
Initial contact point host for Cassandra including port.
string
localhost
connect.cassandra.port
Cassandra native port.
int
9042
connect.cassandra.key.space
Keyspace to write to.
string
connect.cassandra.username
Username to connect to Cassandra with.
string
connect.cassandra.password
Password for the username to connect to Cassandra with.
password
connect.cassandra.ssl.enabled
Secure Cassandra driver connection via SSL.
boolean
false
connect.cassandra.trust.store.path
Path to the client Trust Store.
string
connect.cassandra.trust.store.password
Password for the client Trust Store.
password
connect.cassandra.trust.store.type
Type of the Trust Store, defaults to JKS
string
JKS
connect.cassandra.key.store.type
Type of the Key Store, defauts to JKS
string
JKS
connect.cassandra.ssl.client.cert.auth
Enable client certification authentication by Cassandra. Requires KeyStore options to be set.
boolean
false
connect.cassandra.key.store.path
Path to the client Key Store.
string
connect.cassandra.key.store.password
Password for the client Key Store
password
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.
string
connect.cassandra.fetch.size
The number of records the Cassandra driver will return at once.
int
5000
connect.cassandra.load.balancing.policy
Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN
string
TOKEN_AWARE
connect.cassandra.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.
string
THROW
connect.cassandra.max.retries
The maximum number of times to try the write again.
int
20
connect.cassandra.retry.interval
The time in milliseconds between retries.
int
60000
connect.cassandra.task.buffer.size
The size of the queue as read writes to.
int
10000
connect.cassandra.assigned.tables
The tables a task has been assigned.
string
connect.cassandra.batch.size
The number of records the source task should drain from the reader queue.
int
100
connect.cassandra.import.poll.interval
The polling interval between queries against tables for bulk mode.
long
1000
connect.cassandra.time.slice.ms
The range of time in milliseconds the source task the timestamp/timeuuid will use for query
long
10000
connect.cassandra.import.allow.filtering
Enable ALLOW FILTERING in incremental selects.
boolean
true
connect.cassandra.slice.duration
Duration to query for in target Cassandra table. Used to restrict query timestamp span
long
10000
connect.cassandra.slice.delay.ms
The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available
long
30000
connect.cassandra.initial.offset
The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z
string
1900-01-01 00:00:00.0000000Z
connect.cassandra.mapping.collection.to.json
Mapping columns with type Map, List and Set like json
boolean
true
connect.cassandra.kcql
KCQL expression describing field selection and routes.
string
JSON
STRING
Same,Other
Yes, No
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
Yes
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
No
ByteArrayConverter
ByteArrayConverter
JSON
JSON
Same,Other
Yes
JsonConverter
StringConverter
JSON
JSON
Same,Other
No
StringConverter
StringConverter
AVRO,Parquet
JSON
Same,Other
Yes,No
JsonConverter
JsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSON
BYTES
Same,Other
Yes,No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Same
Yes
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
AVRO
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Other
Yes,No
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
Yes
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
Protobuf
Other
Yes,No
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSON
Other
Same, Other
Yes,No
ByteArrayConverter
ByteArrayConverter
read.text.mode
Controls how Text content is read
Enum
Regex, StartEndTag, StartEndLine
read.text.regex
Regular Expression for Text Reading (if applicable)
String
read.text.start.tag
Start Tag for Text Reading (if applicable)
String
read.text.end.tag
End Tag for Text Reading (if applicable)
String
read.text.buffer.size
Text Buffer Size (for optimization)
Int
read.text.start.line
Start Line for Text Reading (if applicable)
String
read.text.end.line
End Line for Text Reading (if applicable)
String
read.text.trim
Trim Text During Reading
Boolean
store.envelope
Messages are stored as “Envelope”
Boolean
post.process.action
Defines the action to perform on source objects after successful processing.
Enum
DELETE or MOVE
post.process.action.bucket
Specifies the target bucket for the MOVE
action (required for MOVE
).
String
post.process.action.prefix
Specifies a new prefix for the object’s location when using the MOVE
action (required for MOVE
).
String
post.process.action.retain.dirs
Ensure that paths are retained after a post-process action, using a zero-byte object to represent the path.
Boolean
false
connect.s3.aws.auth.mode
Specifies the AWS authentication mode for connecting to S3.
string
"Credentials," "Default"
"Default"
connect.s3.aws.access.key
Access Key for AWS S3 Credentials
string
connect.s3.aws.secret.key
Secret Key for AWS S3 Credentials
string
connect.s3.aws.region
AWS Region for S3 Bucket
string
connect.s3.pool.max.connections
Maximum Connections in the Connection Pool
int
-1 (undefined)
50
connect.s3.custom.endpoint
Custom Endpoint URL for S3 (if applicable)
string
connect.s3.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
connect.s3.vhost.bucket
Enable Virtual Hosted-style Buckets for S3
boolean
true, false
false
connect.s3.source.extension.excludes
A comma-separated list of object extensions to exclude from the source object search.
string
[Object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.s3.source.extension.includes
A comma-separated list of object extensions to include in the source object search.
string
[object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.s3.source.partition.extractor.type
Type of Partition Extractor (Hierarchical or Regex)
string
hierarchical, regex
connect.s3.source.partition.extractor.regex
Regex Pattern for Partition Extraction (if applicable)
string
connect.s3.ordering.type
Type of ordering for the S3 object keys to ensure the processing order.
string
AlphaNumeric, LastModified
AlphaNumeric
connect.s3.source.partition.search.continuous
If set to true the connector will continuously search for new partitions.
boolean
true, false
true
connect.s3.source.partition.search.excludes
A comma-separated list of paths to exclude from the partition search.
string
".indexes"
connect.s3.source.partition.search.interval
The interval in milliseconds between searching for new partitions.
long
300000
connect.s3.source.partition.search.recurse.levels
Controls how many levels deep to recurse when searching for new partitions
int
0
connect.s3.source.empty.results.backoff.initial.delay
Initial delay before retrying when no results are found.
long
1000 Milliseconds
connect.s3.source.empty.results.backoff.max.delay
Maximum delay before retrying when no results are found.
long
10000 Milliseconds
connect.s3.source.empty.results.backoff.multiplier
Multiplier to apply to the delay when retrying when no results are found.
double
2.0 Multiplier (x)
connect.s3.source.write.watermark.header
Write the record with kafka headers including details of the source and line number of the file.
boolean
true, false
false
connect.ftp.address
host[:port] of the ftp server
string
connect.ftp.user
Username to connect with
string
connect.ftp.password
Password to connect with
string
connect.ftp.refresh
iso8601 duration that the server is polled
string
connect.ftp.file.maxage
iso8601 duration for how old files can be
string
connect.ftp.keystyle
SourceRecord keystyle, string or struct
string
connect.ftp.protocol
Protocol to use, FTP or FTPS
string
ftp
connect.ftp.timeout
Ftp connection timeout in milliseconds
int
30000
connect.ftp.filter
Regular expression to use when selecting files for processing
string
.*
connect.ftp.monitor.tail
Comma separated lists of path:destinationtopic; tail of file to tracked
string
connect.ftp.monitor.update
Comma separated lists of path:destinationtopic; whole file is tracked
string
connect.ftp.monitor.slicesize
File slice size in bytes
int
-1
connect.ftp.fileconverter
File converter class
string
com.datamountaineer.streamreactor.connect.ftp.source.SimpleFileConverter
connect.ftp.sourcerecordconverter
Source record converter class
string
com.datamountaineer.streamreactor.connect.ftp.source.NopSourceRecordConverter
connect.ftp.max.poll.records
Max number of records returned per poll
int
10000
message_timestamp
Optional int64
correlation_id
Optional string
redelivered
Optional boolean
reply_to
Optional string
destination
Optional string
message_id
Optional string
mode
Optional int32
type
Optional string
priority
Optional int32
bytes_payload
Optional bytes
properties
Map of string
connect.jms.url
Provides the JMS broker url
string
connect.jms.initial.context.factory
Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
string
connect.jms.connection.factory
Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory
string
ConnectionFactory
connect.jms.kcql
connect.jms.kcql
string
connect.jms.subscription.name
subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics
string
connect.jms.password
Provides the password for the JMS connection
password
connect.jms.username
Provides the user for the JMS connection
string
connect.jms.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.jms.retry.interval
The time in milliseconds between retries.
int
60000
connect.jms.max.retries
The maximum number of times to try the write again.
int
20
connect.jms.destination.selector
Selector to use for destination lookup. Either CDI or JNDI.
string
CDI
connect.jms.initial.context.extra.params
List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp
list
[]
connect.jms.batch.size
The number of records to poll for on the target JMS destination in each Connect poll.
int
100
connect.jms.polling.timeout
Provides the timeout to poll incoming messages
long
1000
connect.jms.source.default.converter
Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter
string
connect.jms.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.headers
Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.jms.evict.interval.minutes
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.
int
10
connect.jms.evict.threshold.minutes
The number of minutes after which an uncommitted entry becomes evictable from the connector cache.
int
10
connect.jms.scale.type
How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max
batch.size
The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping).
int
1000
cache.ttl
The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message.
long
1 hour
queue.max
Data is loaded into a queue asynchronously so that it stands ready when the poll
call is activated. Control the maximum number of records to hold in the queue per KCQL mapping.
int
10000
ProjectId
String
The Pub/Sub project containing the topic from which messages are polled.
TopicId
String
The Pub/Sub topic containing the messages.
SubscriptionId
String
The Pub/Sub subscription of the Pub/Sub topic.
MessageId
String
A unique ID for a Pub/Sub message
MessageData
Optional String
The body of the Pub/Sub message.
AttributeMap
Optional String
The attribute map associated with the Pub/Sub message.
connect.pubsub.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
Credentials, File or Default
Default
connect.pubsub.gcp.credentials
For “auth.mode” credentials: GCP Authentication credentials string.
string
(Empty)
connect.pubsub.gcp.file
For “auth.mode” file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.pubsub.gcp.project.id
GCP Project ID.
string
(Empty)
connect.pubsub.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
connect.pubsub.output.mode
Output mode. Please see Output Modes documentation below.
Default or Compatibility
Default
JSON
STRING
Same,Other
Yes, No
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
Yes
StringConverter
StringConverter
AVRO,Parquet
STRING
Same,Other
No
ByteArrayConverter
ByteArrayConverter
JSON
JSON
Same,Other
Yes
JsonConverter
StringConverter
JSON
JSON
Same,Other
No
StringConverter
StringConverter
AVRO,Parquet
JSON
Same,Other
Yes,No
JsonConverter
JsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSON
BYTES
Same,Other
Yes,No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Same
Yes
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
AVRO
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
AVRO
Other
Yes,No
Avro Converter( Glue, Confluent)
Avro Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
Yes
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet
Protobuf
Same
No
ByteArrayConverter
ByteArrayConverter
AVRO,Parquet
Protobuf
Other
Yes,No
Protobuf Converter( Glue, Confluent)
Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSON
Other
Same, Other
Yes,No
ByteArrayConverter
ByteArrayConverter
read.text.mode
Controls how Text content is read
Enum
Regex, StartEndTag, StartEndLine
read.text.regex
Regular Expression for Text Reading (if applicable)
String
read.text.start.tag
Start Tag for Text Reading (if applicable)
String
read.text.end.tag
End Tag for Text Reading (if applicable)
String
read.text.buffer.size
Text Buffer Size (for optimization)
Int
read.text.start.line
Start Line for Text Reading (if applicable)
String
read.text.end.line
End Line for Text Reading (if applicable)
String
read.text.trim
Trim Text During Reading
Boolean
store.envelope
Messages are stored as “Envelope”
Boolean
post.process.action
Defines the action to perform on source objects after successful processing.
Enum
DELETE or MOVE
post.process.action.bucket
Specifies the target bucket for the MOVE
action (required for MOVE
).
String
post.process.action.prefix
Specifies a new prefix for the object’s location when using the MOVE
action (required for MOVE
).
String
connect.gcpstorage.gcp.auth.mode
Specifies the authentication mode for connecting to GCP.
string
"Credentials", "File" or "Default"
"Default"
connect.gcpstorage.gcp.credentials
For "auth.mode" credentials: GCP Authentication credentials string.
string
(Empty)
connect.gcpstorage.gcp.file
For "auth.mode" file: Local file path for file containing GCP authentication credentials.
string
(Empty)
connect.gcpstorage.gcp.project.id
GCP Project ID.
string
(Empty)
connect.gcpstorage.gcp.quota.project.id
GCP Quota Project ID.
string
(Empty)
connect.gcpstorage.endpoint
Endpoint for GCP Storage.
string
connect.gcpstorage.error.policy
Defines the error handling policy when errors occur during data transfer to or from GCP Storage.
string
"NOOP," "THROW," "RETRY"
"THROW"
connect.gcpstorage.max.retries
Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.
int
20
connect.gcpstorage.retry.interval
Specifies the interval (in milliseconds) between retry attempts by the connector.
int
60000
connect.gcpstorage.http.max.retries
Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.
long
5
connect.gcpstorage.http.retry.interval
Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.
long
50
connect.gcpstorage.kcql
Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour
string
[kcql configuration]({{< relref "#kcql-support" >}})
connect.gcpstorage.source.extension.excludes
A comma-separated list of object extensions to exclude from the source object search.
string
[object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.gcpstorage.source.extension.includes
A comma-separated list of object extensions to include in the source object search.
string
[object extension filtering]({{< relref "#object-extension-filtering" >}})
connect.gcpstorage.source.partition.extractor.type
Type of Partition Extractor (Hierarchical or Regex)
string
hierarchical, regex
connect.gcpstorage.source.partition.extractor.regex
Regex Pattern for Partition Extraction (if applicable)
string
connect.gcpstorage.source.partition.search.continuous
If set to true the connector will continuously search for new partitions.
boolean
true, false
true
connect.gcpstorage.source.partition.search.interval
The interval in milliseconds between searching for new partitions.
long
300000
connect.gcpstorage.source.partition.search.excludes
A comma-separated list of paths to exclude from the partition search.
string
".indexes"
connect.gcpstorage.source.partition.search.recurse.levels
Controls how many levels deep to recurse when searching for new partitions
int
0
connect.gcpstorage.ordering,type
Type of ordering for the GCS object keys to ensure the processing order.
string
AlphaNumeric, LastModified
AlphaNumeric
connect.gcpstorage.source.empty.results.backoff.initial.delay
Initial delay before retrying when no results are found.
long
1000 Milliseconds
connect.gcpstorage.source.empty.results.backoff.max.delay
Maximum delay before retrying when no results are found.
long
10000 Milliseconds
connect.gcpstorage.source.empty.results.backoff.multiplier
Multiplier to apply to the delay when retrying when no results are found.
double
2.0 Multiplier (x)
connect.gcpstorage.source.write.watermark.header
Write the record with kafka headers including details of the source and line number of the file.
boolean
true, false
false
This page describes the usage of the Stream Reactor MQTT Source Connector.
A Kafka Connect source connector to read events from MQTT and push them to Kafka.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.
The connector supports converters to handle different messages payload format in the source topic. See source record converters.
connect.mqtt.hosts
Contains the MQTT connection end points.
string
connect.mqtt.username
Contains the Mqtt connection user name
string
connect.mqtt.password
Contains the Mqtt connection password
password
connect.mqtt.service.quality
Specifies the Mqtt quality of service
int
connect.mqtt.timeout
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.log.message
Logs received MQTT messages
boolean
false
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.mqtt.polling.timeout
Provides the timeout to poll incoming messages
int
1000
connect.mqtt.share.replicate
Replicate the shared subscriptions to all tasks instead of distributing them
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
connect.mqtt.process.duplicates
Process duplicate messages
boolean
false