Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page details the configuration options for the Stream Reactor Kafka Connect source connectors.
Source connectors read data from external systems and write to Kafka.
This page describes the usage of the Stream Reactor Azure Data Lake Gen2 Source Connector.
Coming soon!
This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.
This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.
For more examples see the tutorials.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.
You can specify multiple KCQL statements separated by ;
to have the connector map between multiple topics.
The following KCQL is supported:
It allows you to map Service Bus of name <your-service-bus>
to Kafka topic of name <your-kafka-topic>
using the PROPERTIES specified.
The selection of fields from the Service Bus message is not supported.
Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Learn more about different methods of connecting to Service Bus on the Azure Website.
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type
and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE
here and we need to pass it.
This is sufficient to enable you to create the mapping with your queue.
In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:
Parameter servicebus.type
which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC
here and we need to pass it.
Parameter subscription.name
which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.
This is sufficient to enable you to create the mapping with your topic.
Please find below all the necessary KCQL properties:
Please find below all the relevant configuration parameters:
This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.
A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.
In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info .
For more examples see the .
Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):
Connector allows for multiple KCQL commands.
The following KCQL is supported:
The selection of fields from the Event Hubs message is not supported.
As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.
You can connect to Azure EventHubs passing specific JAAS parameters in configuration.
The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings
and when user specifies a property with it, it will be automatically passed to the Consumer.
User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.
It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records
is passed directly to it)
There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:
client.id
- Connector sets it by itself
group.id
- Connector sets it by itself
key.deserializer
- Connector transitions bytes 1-to-1
value.deserializer
- Connector transitions bytes 1-to-1
enable.auto.commit
- connector automatically sets it to false
and checks what offsets are committed in output topic instead
This page describes the usage of the Stream Reactor Google PubSub Source Connector.
The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).
For more examples see the .
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The source and target of the data are specified via the INSERT INTO... SELECT * FROM
clause. The connector will write all the records to the given topic, from the given subscription:
The PROPERTIES
clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').
Properties can be defined in any order.
The following properties are supported:
The connector offers three distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.
Here's an example configuration for the "Credentials" mode:
And here is an example configuration using the "File" mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.
Two modes are available: Default Mode and Compatibility Mode.
Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.
You can choose whichever suits your requirements.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: A String of the Pub/Sub MessageID.
Kafka Value: The Pub/Sub message value as BYTES.
Kafka Headers: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.
The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.
The Kafka Value is mapped from the body of the Pub/Sub message.
The Kafka Headers include:
PublishTimestamp: Long value representing the time when the Pub/Sub message was published, in seconds.
GCPProjectID: The GCP Project
PubSubTopicID: The Pub/Sub Topic ID.
PubSubSubscriptionID: The Pub/Sub Subscription ID.
All Pub/Sub message attributes: Each attribute from the Pub/Sub message is mapped as a separate header.
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
Kafka Key: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.
Kafka Value: Contains the message data and attributes from the Pub/Sub message.
The Key is a structure with these fields:
The Value is a structure with these fields:
This page describes the usage of the Stream Reactor Cassandra Source Connector.
Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
Examples:
The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
Keying is only supported in conjunction with the WITHFORMAT JSON clause
This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:
TIMESTAMP
TIMEUUID
TOKEN
DSESEARCHTIMESTAMP
If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.
You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.
DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.
The connector constantly loads the entire table.
The connector can be configured to:
Start from a particular offset - connect.cassandra.initial.offset
Increase or decrease the poll interval - connect.cassandra.import.poll.interval
Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration
The following CQL data types are supported:
\
This page describes the usage of the Stream Reactor AWS S3 Source Connector.
This connector is also available on the AWS Marketplace.
Files that have been archived to AWS Glacier storage class are skipped, in order to load these files you must manually restore the files. Skipped files are logged in the Connect workers log files.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The S3 source location is defined within the FROM clause. The connector will read all files from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the files in the bucket.
connect.s3.source.partition.extractor.regex=none
connect.s3.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read files containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.
Text: The connector will read files containing lines of text, each line representing a distinct record.
CSV: The connector will read files containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read files containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read files containing bytes, each file is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
The S3 sink employs zero-padding in file names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of files.
When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical file name order.
In such cases, to process files in the correct sequence, the source needs to list all files in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type
to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the files.
To limit the number of file names the source reads from S3 in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The AWS S3 Source Connector allows you to filter the files to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
.
The connect.s3.source.extension.excludes
property is a comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. For example, to exclude .txt
and .csv
files, you would set this property as follows:
The connect.s3.source.extension.includes
property is a comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. For example, to include only .json
and .xml
files, you would set this property as follows:
Note: If both connect.s3.source.extension.excludes
and connect.s3.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
The connector can also be used against API compatible systems provided they implement the following:
This page describes the usage of the Stream Reactor GCP Storage Source Connector.
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
The GCP Storage source location is defined within the FROM clause. The connector will read all files from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The FROM clause format is:
If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the files in the bucket.
connect.gcpstorage.source.partition.extractor.regex=none
connect.gcpstorage.source.ordering.type=LastModified
To load in alpha numeric order set the ordering type to AlphaNumeric
.
The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:
The connector supports a range of storage formats, each with its own distinct functionality:
JSON: The connector will read files containing JSON content, each line representing a distinct record.
Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
Text: The connector will read files containing lines of text, each line representing a distinct record.
CSV: The connector will read files containing lines of text, each line representing a distinct record.
CSV_WithHeaders: The connector will read files containing lines of text, each line representing a distinct record while skipping the header row.
Bytes: The connector will read files containing bytes, each file is translated to a Kafka message.
Use the STOREAS
clause to configure the storage format. The following options are available:
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
To trim the start and end lines, set the read.text.trim property to true:
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical file name order.
In such cases, to process files in the correct sequence, the source needs to list all files in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type
to LastModified
. This ensures that the source correctly arranges and processes the data based on the timestamps of the files.
To limit the number of file names the source reads from GCS in a single poll. The default value, if not specified, is 1000:
To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
The GCP Storage Source Connector allows you to filter the files to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
.
The connect.gcpstorage.source.extension.excludes
property is a comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. For example, to exclude .txt
and .csv
files, you would set this property as follows:
The connect.gcpstorage.source.extension.includes
property is a comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. For example, to include only .json
and .xml
files, you would set this property as follows:
Note: If both connect.gcpstorage.source.extension.excludes
and connect.gcpstorage.source.extension.includes
are set, the connector first applies the exclusion filter and then the inclusion filter.
The PROPERTIES
clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.
The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “File” mode:
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.
When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:
When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.
When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.
To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type
property, which supports two options:
hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.
regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex
property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:
Field Name | Schema Type | Description |
---|---|---|
Name | Description | Type | Default Value |
---|---|---|---|
Name | Description | Type | Default Value |
---|---|---|---|
Learn more about different methods of connecting to Event Hubs on . The only caveat is to add connector-specific prefix like in example above. See for more info.
Name | Description | Type | Default Value |
---|
Name | Description | Type | Default Value |
---|
When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
Field Name | Schema Type | Description |
---|
Field Name | Schema Type | Description |
---|
Name | Description | Type | Available Values | Default Value |
---|
For a more detailed explanation of how to use options.
CQL Type | Connect Data Type |
---|
Name | Description | Type | Default Value |
---|
For more examples see the .
S3 Storage Format | Kafka Output Format | Restore or replicate cluster | Analytics | Sink Converter | Source Converter |
---|
Name | Description | Type | Available Values |
---|
Name | Description | Type | Available Values | Default Value |
---|
S3 Storage Format | Kafka Output Format | Restore or replicate cluster | Analytics | Sink Converter | Source Converter |
---|
s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of files.
Name | Description | Type | Available Values |
---|
When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .
Name | Description | Type | Available Values | Default Value |
---|
MessageId
String
The message identifier that uniquely identifies the message and its payload.
Field Name
Schema Type
Description
deliveryCount
int64
The number of the times this message was delivered to clients.
enqueuedTimeUtc
int64
The time at which this message was enqueued in Azure Service Bus.
contentType
Optional String
The content type of this message.
label
Optional String
The application specific message label.
correlationId
Optional String
The correlation identifier.
messageProperties
Optional String
The map of user application properties of this message.
partitionKey
Optional String
The partition key for sending a message to a partitioned entity.
replyTo
Optional String
The address of an entity to send replies to.
replyToSessionId
Optional String
The session identifier augmenting the ReplyTo address.
deadLetterSource
Optional String
The name of the queue or subscription that this message was enqueued on, before it was deadlettered.
timeToLive
int64
The duration before this message expires.
lockedUntilUtc
Optional int64
The time when the lock of this message expires.
sequenceNumber
Optional int64
The unique number assigned to a message by Azure Service Bus.
sessionId
Optional String
The session identifier for a session-aware entity.
lockToken
Optional String
The lock token for the current message.
messageBody
Optional bytes
The body of this message as a byte array.
getTo
Optional String
The “to” address.
servicebus.type
Specifies Service Bus type: QUEUE
or TOPIC
string
subscription.name
Specifies subscription name if Service Bus type is TOPIC
string
connect.servicebus.connection.string
Specifies the Connection String to connect to Service Bus
string
connect.servicebus.kcql
Comma-separated output KCQL queries
string
connect.servicebus.source.task.records.queue.size
Specifies the Queue size between Service Bus Receivers and Kafka
int
20
connect.eventhubs.source.connection.settings.bootstrap.servers | Specifies the Event Hubs server location. | string |
connect.eventhubs.source.close.timeout | Amount of time (in seconds) for Consumer to close. | int | 30 |
connect.eventhubs.source.default.offset | Specifies whether by default we should consume from earliest (default) or latest offset. | string | earliest |
connect.eventhubs.kcql | Comma-separated output KCQL queries | string |
batch.size | The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping). | int | 1000 |
cache.ttl | The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message. | long | 1 hour |
queue.max | Data is loaded into a queue asynchronously so that it stands ready when the | int | 10000 |
ProjectId | String | The Pub/Sub project containing the topic from which messages are polled. |
TopicId | String | The Pub/Sub topic containing the messages. |
SubscriptionId | String | The Pub/Sub subscription of the Pub/Sub topic. |
MessageId | String | A unique ID for a Pub/Sub message |
MessageData | Optional String | The body of the Pub/Sub message. |
AttributeMap | Optional String | The attribute map associated with the Pub/Sub message. |
TimeUUID | Optional String |
UUID | Optional String |
Inet | Optional String |
Ascii | Optional String |
Text | Optional String |
Timestamp | Optional String |
Date | Optional String |
Tuple | Optional String |
UDT | Optional String |
Boolean | Optional Boolean |
TinyInt | Optional Int8 |
SmallInt | Optional Int16 |
Int | Optional Int32 |
Decimal | Optional String |
Float | Optional Float32 |
Counter | Optional Int64 |
BigInt | Optional Int64 |
VarInt | Optional Int64 |
Double | Optional Int64 |
Time | Optional Int64 |
Blob | Optional Bytes |
Map | Optional [String -> MAP] |
List | Optional [String -> ARRAY] |
Set | Optional [String -> ARRAY] |
connect.cassandra.contact.points | Initial contact point host for Cassandra including port. | string | localhost |
connect.cassandra.port | Cassandra native port. | int | 9042 |
connect.cassandra.key.space | Keyspace to write to. | string |
connect.cassandra.username | Username to connect to Cassandra with. | string |
connect.cassandra.password | Password for the username to connect to Cassandra with. | password |
connect.cassandra.ssl.enabled | Secure Cassandra driver connection via SSL. | boolean | false |
connect.cassandra.trust.store.path | Path to the client Trust Store. | string |
connect.cassandra.trust.store.password | Password for the client Trust Store. | password |
connect.cassandra.trust.store.type | Type of the Trust Store, defaults to JKS | string | JKS |
connect.cassandra.key.store.type | Type of the Key Store, defauts to JKS | string | JKS |
connect.cassandra.ssl.client.cert.auth | Enable client certification authentication by Cassandra. Requires KeyStore options to be set. | boolean | false |
connect.cassandra.key.store.path | Path to the client Key Store. | string |
connect.cassandra.key.store.password | Password for the client Key Store | password |
connect.cassandra.consistency.level | Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be. | string |
connect.cassandra.fetch.size | The number of records the Cassandra driver will return at once. | int | 5000 |
connect.cassandra.load.balancing.policy | Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN | string | TOKEN_AWARE |
connect.cassandra.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them. | string | THROW |
connect.cassandra.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.cassandra.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.cassandra.task.buffer.size | The size of the queue as read writes to. | int | 10000 |
connect.cassandra.assigned.tables | The tables a task has been assigned. | string |
connect.cassandra.batch.size | The number of records the source task should drain from the reader queue. | int | 100 |
connect.cassandra.import.poll.interval | The polling interval between queries against tables for bulk mode. | long | 1000 |
connect.cassandra.time.slice.ms | The range of time in milliseconds the source task the timestamp/timeuuid will use for query | long | 10000 |
connect.cassandra.import.allow.filtering | Enable ALLOW FILTERING in incremental selects. | boolean | true |
connect.cassandra.slice.duration | Duration to query for in target Cassandra table. Used to restrict query timestamp span | long | 10000 |
connect.cassandra.slice.delay.ms | The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available | long | 30000 |
connect.cassandra.initial.offset | The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z | string | 1900-01-01 00:00:00.0000000Z |
connect.cassandra.mapping.collection.to.json | Mapping columns with type Map, List and Set like json | boolean | true |
connect.cassandra.kcql | KCQL expression describing field selection and routes. | string |
JSON | STRING | Same,Other | Yes, No | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | Yes | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | No | ByteArrayConverter | ByteArrayConverter |
JSON | JSON | Same,Other | Yes | JsonConverter | StringConverter |
JSON | JSON | Same,Other | No | StringConverter | StringConverter |
AVRO,Parquet | JSON | Same,Other | Yes,No | JsonConverter | JsonConverter or Avro Converter( Glue, Confluent) |
AVRO,Parquet, JSON | BYTES | Same,Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Same | Yes | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | AVRO | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Other | Yes,No | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | Yes | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | Protobuf | Other | Yes,No | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet, JSON | Other | Same, Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
read.text.mode | Controls how Text content is read | Enum | Regex, StartEndTag, StartEndLine |
read.text.regex | Regular Expression for Text Reading (if applicable) | String |
read.text.start.tag | Start Tag for Text Reading (if applicable) | String |
read.text.end.tag | End Tag for Text Reading (if applicable) | String |
read.text.buffer.size | Text Buffer Size (for optimization) | Int |
read.text.start.line | Start Line for Text Reading (if applicable) | String |
read.text.end.line | End Line for Text Reading (if applicable) | String |
read.text.trim | Trim Text During Reading | Boolean |
store.envelope | Messages are stored as “Envelope” | Boolean |
JSON | STRING | Same,Other | Yes, No | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | Yes | StringConverter | StringConverter |
AVRO,Parquet | STRING | Same,Other | No | ByteArrayConverter | ByteArrayConverter |
JSON | JSON | Same,Other | Yes | JsonConverter | StringConverter |
JSON | JSON | Same,Other | No | StringConverter | StringConverter |
AVRO,Parquet | JSON | Same,Other | Yes,No | JsonConverter | JsonConverter or Avro Converter( Glue, Confluent) |
AVRO,Parquet, JSON | BYTES | Same,Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Same | Yes | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | AVRO | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | AVRO | Other | Yes,No | Avro Converter( Glue, Confluent) | Avro Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | Yes | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet | Protobuf | Same | No | ByteArrayConverter | ByteArrayConverter |
AVRO,Parquet | Protobuf | Other | Yes,No | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent) |
AVRO,Parquet, JSON | Other | Same, Other | Yes,No | ByteArrayConverter | ByteArrayConverter |
read.text.mode | Controls how Text content is read | Enum | Regex, StartEndTag, StartEndLine |
read.text.regex | Regular Expression for Text Reading (if applicable) | String |
read.text.start.tag | Start Tag for Text Reading (if applicable) | String |
read.text.end.tag | End Tag for Text Reading (if applicable) | String |
read.text.buffer.size | Text Buffer Size (for optimization) | Int |
read.text.start.line | Start Line for Text Reading (if applicable) | String |
read.text.end.line | End Line for Text Reading (if applicable) | String |
read.text.trim | Trim Text During Reading | Boolean |
store.envelope | Messages are stored as “Envelope” | Boolean |
connect.gcpstorage.gcp.auth.mode | Specifies the authentication mode for connecting to GCP. | string | "Credentials", "File" or "Default" | "Default" |
connect.gcpstorage.gcp.credentials | For "auth.mode" credentials: GCP Authentication credentials string. | string | (Empty) |
connect.gcpstorage.gcp.file | For "auth.mode" file: Local file path for file containing GCP authentication credentials. | string | (Empty) |
connect.gcpstorage.gcp.project.id | GCP Project ID. | string | (Empty) |
connect.gcpstorage.gcp.quota.project.id | GCP Quota Project ID. | string | (Empty) |
connect.gcpstorage.endpoint | Endpoint for GCP Storage. | string |
connect.gcpstorage.error.policy | Defines the error handling policy when errors occur during data transfer to or from GCP Storage. | string | "NOOP," "THROW," "RETRY" | "THROW" |
connect.gcpstorage.max.retries | Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework. | int | 20 |
connect.gcpstorage.retry.interval | Specifies the interval (in milliseconds) between retry attempts by the connector. | int | 60000 |
connect.gcpstorage.http.max.retries | Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage. | long | 5 |
connect.gcpstorage.http.retry.interval | Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed. | long | 50 |
connect.gcpstorage.kcql | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour | string | [kcql configuration]({{< relref "#kcql-support" >}}) |
connect.gcpstorage.source.extension.excludes | A comma-separated list of file extensions to exclude from the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.gcpstorage.source.extension.includes | A comma-separated list of file extensions to include in the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.gcpstorage.source.partition.extractor.type | Type of Partition Extractor (Hierarchical or Regex) | string | hierarchical, regex |
connect.gcpstorage.source.partition.extractor.regex | Regex Pattern for Partition Extraction (if applicable) | string |
connect.gcpstorage.source.partition.search.continuous | If set to true the connector will continuously search for new partitions. | boolean | true, false | true |
connect.gcpstorage.source.partition.search.interval | The interval in milliseconds between searching for new partitions. | long | 300000 |
connect.gcpstorage.source.partition.search.excludes | A comma-separated list of paths to exclude from the partition search. | string | ".indexes" |
connect.gcpstorage.source.partition.search.recurse.levels | Controls how many levels deep to recurse when searching for new partitions | int | 0 |
connect.gcpstorage.ordering,type | Type of ordering for the GCS file names to ensure the processing order. | string | AlphaNumeric, LastModified | AlphaNumeric |
connect.pubsub.gcp.auth.mode | Specifies the authentication mode for connecting to GCP. | string | Credentials, File or Default | Default |
connect.pubsub.gcp.credentials | For “auth.mode” credentials: GCP Authentication credentials string. | string | (Empty) |
connect.pubsub.gcp.file | For “auth.mode” file: Local file path for file containing GCP authentication credentials. | string | (Empty) |
connect.pubsub.gcp.project.id | GCP Project ID. | string | (Empty) |
connect.pubsub.kcql | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour | string |
connect.pubsub.output.mode | Default or Compatibility | Default |
connect.s3.aws.auth.mode | Specifies the AWS authentication mode for connecting to S3. | string | "Credentials," "Default" | "Default" |
connect.s3.aws.access.key | Access Key for AWS S3 Credentials | string |
connect.s3.aws.secret.key | Secret Key for AWS S3 Credentials | string |
connect.s3.aws.region | AWS Region for S3 Bucket | string |
connect.s3.pool.max.connections | Maximum Connections in the Connection Pool | int | -1 (undefined) | 50 |
connect.s3.custom.endpoint | Custom Endpoint URL for S3 (if applicable) | string |
connect.s3.kcql | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour | string |
connect.s3.vhost.bucket | Enable Virtual Hosted-style Buckets for S3 | boolean | true, false | false |
connect.s3.source.extension.excludes | A comma-separated list of file extensions to exclude from the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.s3.source.extension.includes | A comma-separated list of file extensions to include in the source file search. | string | [file extension filtering]({{< relref "#file-extension-filtering" >}}) |
connect.s3.source.partition.extractor.type | Type of Partition Extractor (Hierarchical or Regex) | string | hierarchical, regex |
connect.s3.source.partition.extractor.regex | Regex Pattern for Partition Extraction (if applicable) | string |
connect.s3.ordering.type | Type of ordering for the S3 file names to ensure the processing order. | string | AlphaNumeric, LastModified | AlphaNumeric |
connect.s3.source.partition.search.continuous | If set to true the connector will continuously search for new partitions. | boolean | true, false | true |
connect.s3.source.partition.search.excludes | A comma-separated list of paths to exclude from the partition search. | string | ".indexes" |
connect.s3.source.partition.search.interval | The interval in milliseconds between searching for new partitions. | long | 300000 |
connect.s3.source.partition.search.recurse.levels | Controls how many levels deep to recurse when searching for new partitions | int | 0 |
This page describes the usage of the Stream Reactor FTP Source Connector.
Provide the remote directories and on specified intervals, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting them into Kafka.
For more examples see the tutorials.
Each Kafka record represents a file and has the following types.
The format of the keys is configurable through connect.ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with the name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.
The values of the records contain the body of the file as bytes.
The following rules are used.
Tailed files are only allowed to grow. Bytes that have been appended to it since the last inspection are yielded. Preceding bytes are not allowed to change;
Updated files can grow, shrink and change anywhere. The entire contents are yielded.
Instead of dumping whole file bodies (and the danger of exceeding Kafka’s message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:
The default object that is used is a pass-through converter, an instance of:
To override it, create your own implementation of SourceRecordConverter and place the jar in the plugin.path
.
To learn more examples of using the FTP Kafka connector read this blog.
This page describes the usage of the Stream Reactor JMS Source Connector.
A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.
The connector uses the standard JMS protocols and has been tested against ActiveMQ.
The connector allows for the JMS initial.context.factory
and connection.factory
to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path
of the connector.
Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.
The schema of the messages is fixed and can be found under Data Types unless a converter is used.
You must provide the JMS implementation jars for your JMS service.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.
The connector supports converters to handle different message payload formats in the source topic or queue.
If no converter is provided the JMS message is converter to a Kafka Struct representation.
This page describes the usage of the Stream Reactor MQTT Source Connector.
A Kafka Connect source connector to read events from MQTT and push them to Kafka.
For more examples see the tutorials.
You can specify multiple KCQL statements separated by ;
to have the connector sink into multiple topics.
The following KCQL is supported:
The selection of fields from the JMS message is not supported.
Examples:
To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple key fields are supported using a delimiter:
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.
The connector supports converters to handle different messages payload format in the source topic. See source record converters.
Output mode. Please see documentation below.
Name | Description | Type | Default Value |
---|---|---|---|
Name | Type |
---|---|
Name | Description | Type | Default Value |
---|---|---|---|
Name | Description | Type | Default Value |
---|---|---|---|
connect.ftp.address
host[:port] of the ftp server
string
connect.ftp.user
Username to connect with
string
connect.ftp.password
Password to connect with
string
connect.ftp.refresh
iso8601 duration that the server is polled
string
connect.ftp.file.maxage
iso8601 duration for how old files can be
string
connect.ftp.keystyle
SourceRecord keystyle, string or struct
string
connect.ftp.protocol
Protocol to use, FTP or FTPS
string
ftp
connect.ftp.timeout
Ftp connection timeout in milliseconds
int
30000
connect.ftp.filter
Regular expression to use when selecting files for processing
string
.*
connect.ftp.monitor.tail
Comma separated lists of path:destinationtopic; tail of file to tracked
string
connect.ftp.monitor.update
Comma separated lists of path:destinationtopic; whole file is tracked
string
connect.ftp.monitor.slicesize
File slice size in bytes
int
-1
connect.ftp.fileconverter
File converter class
string
com.datamountaineer.streamreactor.connect.ftp.source.SimpleFileConverter
connect.ftp.sourcerecordconverter
Source record converter class
string
com.datamountaineer.streamreactor.connect.ftp.source.NopSourceRecordConverter
connect.ftp.max.poll.records
Max number of records returned per poll
int
10000
message_timestamp
Optional int64
correlation_id
Optional string
redelivered
Optional boolean
reply_to
Optional string
destination
Optional string
message_id
Optional string
mode
Optional int32
type
Optional string
priority
Optional int32
bytes_payload
Optional bytes
properties
Map of string
connect.jms.url
Provides the JMS broker url
string
connect.jms.initial.context.factory
Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
string
connect.jms.connection.factory
Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory
string
ConnectionFactory
connect.jms.kcql
connect.jms.kcql
string
connect.jms.subscription.name
subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics
string
connect.jms.password
Provides the password for the JMS connection
password
connect.jms.username
Provides the user for the JMS connection
string
connect.jms.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.jms.retry.interval
The time in milliseconds between retries.
int
60000
connect.jms.max.retries
The maximum number of times to try the write again.
int
20
connect.jms.destination.selector
Selector to use for destination lookup. Either CDI or JNDI.
string
CDI
connect.jms.initial.context.extra.params
List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp
list
[]
connect.jms.batch.size
The number of records to poll for on the target JMS destination in each Connect poll.
int
100
connect.jms.polling.timeout
Provides the timeout to poll incoming messages
long
1000
connect.jms.source.default.converter
Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter
string
connect.jms.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
string
connect.jms.headers
Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.jms.evict.interval.minutes
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.
int
10
connect.jms.evict.threshold.minutes
The number of minutes after which an uncommitted entry becomes evictable from the connector cache.
int
10
connect.jms.scale.type
How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max
connect.mqtt.hosts
Contains the MQTT connection end points.
string
connect.mqtt.username
Contains the Mqtt connection user name
string
connect.mqtt.password
Contains the Mqtt connection password
password
connect.mqtt.service.quality
Specifies the Mqtt quality of service
int
connect.mqtt.timeout
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.log.message
Logs received MQTT messages
boolean
false
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.mqtt.polling.timeout
Provides the timeout to poll incoming messages
int
1000
connect.mqtt.share.replicate
Replicate the shared subscriptions to all tasks instead of distributing them
boolean
false
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
connect.mqtt.process.duplicates
Process duplicate messages
boolean
false