All pages
Powered by GitBook
2 of 11

Sources

This page details the configuration options for the Stream Reactor Kafka Connect source connectors.

Source connectors read data from external systems and write to Kafka.

Cover

AWS S3

Load data from AWS S3 including restoring topics.

Cover

Azure Data Lake Gen2

Load data from Azure Data Lake Gen2 including restoring topics.

Cover

Azure Event Hubs

Load data from Azure Event Hubs into Kafka topics.

Cover

Azure Service Bus

Load data from Azure Service Bus into Kafka topics.

Cover

Cassandra

Load data from Cassandra into Kafka topics.

Cover

GCP PubSub

Load data from GCP PubSub into Kafka topics.

Cover

GCP Storage

Load data from GCP Storage including restoring topics.

Cover

FTP

Load data from files on FTP servers into Kafka topics.

Cover

JMS

Load data from JMS topics and queues into Kafka topics.

Cover

MQTT

Load data from MQTT into Kafka topics.

AWS S3

This page describes the usage of the Stream Reactor AWS S3 Source Connector.

This connector is also available on the AWS Marketplace.

LogoAWS Marketplace: Lenses Stream Reactor: S3 Source & Sink Kafka Connectors

Objects that have been archived to AWS Glacier storage class are skipped, in order to load these objects you must manually restore the objects. Skipped objects are logged in the Connect workers log files.

Connector Class

io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector

Example

For more examples see the tutorials.

name=aws-s3SourceConnectorParquet
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.s3.aws.region=eu-west-2
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=Credentials

KCQL Support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

INSERT INTO $kafka-topic
SELECT *
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]

Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix

The connector does not support multiple KCQL statements that reference the same source location; to use multiple statements, configure each one in a separate connector instance.

Source Bucket & Path

The S3 source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.

The FROM clause format is:

FROM [bucketname]:pathprefix
//my-bucket-called-pears:my-folder-called-apples 

If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.

connect.s3.source.partition.extractor.regex=none

connect.s3.source.ordering.type=LastModified

To load in alpha numeric order set the ordering type to AlphaNumeric.

Target Bucket & Path

The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:

INSERT INTO my-apples-topic SELECT * FROM  my-bucket-called-pears:my-folder-called-apples 

S3 Object formats

The connector supports a range of storage formats, each with its own distinct functionality:

  • JSON: The connector will read objects containing JSON content, each line representing a distinct record.

  • Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.

  • Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.

  • Text: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.

  • Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.

Use the STOREAS clause to configure the storage format. The following options are available:

STOREAS `JSON`
STOREAS `Avro`
STOREAS `Parquet`
STOREAS `Text`
STOREAS `CSV`
STOREAS `CSV_WithHeaders`
STOREAS `Bytes`

Text Processing

When using Text storage, the connector provides additional configuration options to finely control how text content is processed.

Regex

In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')

Start-End line

In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')

To trim the start and end lines, set the read.text.trim property to true:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')

Start-End tag

In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')

Storage output matrix

Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

S3 Storage Format
Kafka Output Format
Restore or replicate cluster
Analytics
Sink Converter
Source Converter

JSON

STRING

Same,Other

Yes, No

StringConverter

StringConverter

AVRO,Parquet

STRING

Same,Other

Yes

StringConverter

StringConverter

AVRO,Parquet

STRING

Same,Other

No

ByteArrayConverter

ByteArrayConverter

JSON

JSON

Same,Other

Yes

JsonConverter

StringConverter

JSON

JSON

Same,Other

No

StringConverter

StringConverter

AVRO,Parquet

JSON

Same,Other

Yes,No

JsonConverter

JsonConverter or Avro Converter( Glue, Confluent)

AVRO,Parquet, JSON

BYTES

Same,Other

Yes,No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

AVRO

Same

Yes

Avro Converter( Glue, Confluent)

Avro Converter( Glue, Confluent)

AVRO,Parquet

AVRO

Same

No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

AVRO

Other

Yes,No

Avro Converter( Glue, Confluent)

Avro Converter( Glue, Confluent)

AVRO,Parquet

Protobuf

Same

Yes

Protobuf Converter( Glue, Confluent)

Protobuf Converter( Glue, Confluent)

AVRO,Parquet

Protobuf

Same

No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

Protobuf

Other

Yes,No

Protobuf Converter( Glue, Confluent)

Protobuf Converter( Glue, Confluent)

AVRO,Parquet, JSON

Other

Same, Other

Yes,No

ByteArrayConverter

ByteArrayConverter

Projections

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.

Ordering

The S3 sink employs zero-padding in object names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of object.

When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical object key name order.

In such cases, to process object in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.

If using LastModified sorting, ensure objects do not arrive late, or use a post-processing step to handle them.

Throttling

To limit the number of object keys the source reads from S3 in a single poll. The default value, if not specified, is 1000:

BATCH = 100

To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.

LIMIT 10000

Object Extension Filtering

The AWS S3 Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes and connect.s3.source.extension.includes.

Excluding Object Extensions

The connect.s3.source.extension.excludes property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt and .csv objects, you would set this property as follows:

connect.s3.source.extension.excludes=txt,csv

Including Object Extensions

The connect.s3.source.extension.includes property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json and .xml objects, you would set this property as follows:

connect.s3.source.extension.includes=json,xml

Note: If both connect.s3.source.extension.excludes and connect.s3.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.

Post-Processing Options

Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving files to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.

Use Cases for Post-Processing Options

  1. Deleting Objects After Processing

    For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.

    Example:

    INSERT INTO `my-topic`
    SELECT * FROM `my-s3-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`DELETE`
    )

    Result: Objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.

  2. Moving Objects to an Archive Bucket

    To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.

    Example:

    INSERT INTO `my-topic`
    SELECT * FROM `my-s3-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`MOVE`,
        'post.process.action.bucket'=`archive-bucket`,
        'post.process.action.prefix'=`processed/`
    )

    Result: Objects are transferred to an archive-bucket, stored with an updated path that includes the processed/ prefix, maintaining an organized archive structure.

Properties

The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

Name
Description
Type
Available Values

read.text.mode

Controls how Text content is read

Enum

Regex, StartEndTag, StartEndLine

read.text.regex

Regular Expression for Text Reading (if applicable)

String

read.text.start.tag

Start Tag for Text Reading (if applicable)

String

read.text.end.tag

End Tag for Text Reading (if applicable)

String

read.text.buffer.size

Text Buffer Size (for optimization)

Int

read.text.start.line

Start Line for Text Reading (if applicable)

String

read.text.end.line

End Line for Text Reading (if applicable)

String

read.text.trim

Trim Text During Reading

Boolean

store.envelope

Messages are stored as “Envelope”

Boolean

post.process.action

Defines the action to perform on source objects after successful processing.

Enum

DELETE or MOVE

post.process.action.bucket

Specifies the target bucket for the MOVE action (required for MOVE).

String

post.process.action.prefix

Specifies a new prefix for the object’s location when using the MOVE action (required for MOVE).

String

post.process.action.retain.dirs

Ensure that paths are retained after a post-process action, using a zero-byte object to represent the path.

Boolean

false

Authentication

The connector offers two distinct authentication modes:

  • Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.

  • Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.

When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

Here’s an example configuration for the “Credentials” mode:

connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=$AWS_ACCESS_KEY
connect.s3.aws.secret.key=$AWS_SECRET_KEY

For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.

API Compatible systems

The connector can also be used against API compatible systems provided they implement the following:

listObjectsV2
listObjectsV2Pagbinator
putObject
getObject
headObject
deleteObjects
deleteObject

Option Reference

Name
Description
Type
Available Values
Default Value

connect.s3.aws.auth.mode

Specifies the AWS authentication mode for connecting to S3.

string

"Credentials," "Default"

"Default"

connect.s3.aws.access.key

Access Key for AWS S3 Credentials

string

connect.s3.aws.secret.key

Secret Key for AWS S3 Credentials

string

connect.s3.aws.region

AWS Region for S3 Bucket

string

connect.s3.pool.max.connections

Maximum Connections in the Connection Pool

int

-1 (undefined)

50

connect.s3.custom.endpoint

Custom Endpoint URL for S3 (if applicable)

string

connect.s3.kcql

Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour

string

kcql configuration

connect.s3.vhost.bucket

Enable Virtual Hosted-style Buckets for S3

boolean

true, false

false

connect.s3.source.extension.excludes

A comma-separated list of object extensions to exclude from the source object search.

string

[Object extension filtering]({{< relref "#object-extension-filtering" >}})

connect.s3.source.extension.includes

A comma-separated list of object extensions to include in the source object search.

string

[object extension filtering]({{< relref "#object-extension-filtering" >}})

connect.s3.source.partition.extractor.type

Type of Partition Extractor (Hierarchical or Regex)

string

hierarchical, regex

connect.s3.source.partition.extractor.regex

Regex Pattern for Partition Extraction (if applicable)

string

connect.s3.ordering.type

Type of ordering for the S3 object keys to ensure the processing order.

string

AlphaNumeric, LastModified

AlphaNumeric

connect.s3.source.partition.search.continuous

If set to true the connector will continuously search for new partitions.

boolean

true, false

true

connect.s3.source.partition.search.excludes

A comma-separated list of paths to exclude from the partition search.

string

".indexes"

connect.s3.source.partition.search.interval

The interval in milliseconds between searching for new partitions.

long

300000

connect.s3.source.partition.search.recurse.levels

Controls how many levels deep to recurse when searching for new partitions

int

0

connect.s3.source.empty.results.backoff.initial.delay

Initial delay before retrying when no results are found.

long

1000 Milliseconds

connect.s3.source.empty.results.backoff.max.delay

Maximum delay before retrying when no results are found.

long

10000 Milliseconds

connect.s3.source.empty.results.backoff.multiplier

Multiplier to apply to the delay when retrying when no results are found.

double

2.0 Multiplier (x)

connect.s3.source.write.watermark.header

Write the record with kafka headers including details of the source and line number of the file.

boolean

true, false

false

Azure Data Lake Gen2

This page describes the usage of the Stream Reactor Azure Data Lake Gen2 Source Connector.

Coming soon!

Azure Event Hubs

This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.

A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.

In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info here.

Connector Class

io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector

Example

For more examples see the tutorials.

Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):

name=AzureEventHubsSourceConnector
connector.class=io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector
tasks.max=1
connect.eventhubs.kcql=INSERT INTO azureoutput SELECT * FROM inputhub;
connect.eventhubs.source.connection.settings.bootstrap.servers=MYNAMESPACE.servicebus.windows.net:9093
connect.eventhubs.source.connection.settings.sasl.mechanism=PLAIN
connect.eventhubs.source.connection.settings.security.protocol=SASL_SSL
connect.eventhubs.source.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING;EntityPath=inputhub";

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-event-hub>;

The selection of fields from the Event Hubs message is not supported.

Payload support

As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.

Authentication

You can connect to Azure EventHubs passing specific JAAS parameters in configuration.

connect.eventhubs.connection.settings.bootstrap.servers=NAMESPACENAME.servicebus.windows.net:9093
connect.eventhubs.connection.settings.sasl.mechanism=PLAIN
connect.eventhubs.connection.settings.security.protocol=SASL_SSL
connect.eventhubs.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Learn more about different methods of connecting to Event Hubs on Azure website. The only caveat is to add connector-specific prefix like in example above. See Fine-tunning the Kafka Connector for more info.

Fine-tunning the Kafka Connector

The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings and when user specifies a property with it, it will be automatically passed to the Consumer.

User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.

connect.eventhubs.connection.settings.max.poll.records = 100

It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records is passed directly to it)

There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:

  • client.id - Connector sets it by itself

  • group.id - Connector sets it by itself

  • key.deserializer - Connector transitions bytes 1-to-1

  • value.deserializer - Connector transitions bytes 1-to-1

  • enable.auto.commit - connector automatically sets it to false and checks what offsets are committed in output topic instead

Option Reference

Name
Description
Type
Default Value

connect.eventhubs.source.connection.settings.bootstrap.servers

Specifies the Event Hubs server location.

string

connect.eventhubs.source.close.timeout

Amount of time (in seconds) for Consumer to close.

int

30

connect.eventhubs.source.default.offset

Specifies whether by default we should consume from earliest (default) or latest offset.

string

earliest

connect.eventhubs.kcql

Comma-separated output KCQL queries

string

Azure Service Bus

This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.

This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.

Connector Class

io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector

Full Config Example

For more examples see the tutorials.

The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.

connector.class=io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector
name=AzureServiceBusSourceConnector
tasks.max=1
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-topic SELECT * FROM servicebus-queue PROPERTIES('servicebus.type'='QUEUE');

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-service-bus> 
PROPERTIES(...); 

It allows you to map Service Bus of name <your-service-bus> to Kafka topic of name <your-kafka-topic> using the PROPERTIES specified.

The selection of fields from the Service Bus message is not supported.

Payload support

Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.

Key Format (Schema)

Field Name
Schema Type
Description

MessageId

String

The message identifier that uniquely identifies the message and its payload.

Payload Format (Schema)

Field Name

Schema Type

Description

deliveryCount

int64

The number of the times this message was delivered to clients.

enqueuedTimeUtc

int64

The time at which this message was enqueued in Azure Service Bus.

contentType

Optional String

The content type of this message.

label

Optional String

The application specific message label.

correlationId

Optional String

The correlation identifier.

messageProperties

Optional String

The map of user application properties of this message.

partitionKey

Optional String

The partition key for sending a message to a partitioned entity.

replyTo

Optional String

The address of an entity to send replies to.

replyToSessionId

Optional String

The session identifier augmenting the ReplyTo address.

deadLetterSource

Optional String

The name of the queue or subscription that this message was enqueued on, before it was deadlettered.

timeToLive

int64

The duration before this message expires.

lockedUntilUtc

Optional int64

The time when the lock of this message expires.

sequenceNumber

Optional int64

The unique number assigned to a message by Azure Service Bus.

sessionId

Optional String

The session identifier for a session-aware entity.

lockToken

Optional String

The lock token for the current message.

messageBody

Optional bytes

The body of this message as a byte array.

getTo

Optional String

The “to” address.

Authentication

You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.

connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=

Learn more about different methods of connecting to Service Bus on the Azure Website.

Queues & Topics

The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.

Reading from Queues

In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE here and we need to pass it.

connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-queue PROPERTIES('servicebus.type'='QUEUE');

This is sufficient to enable you to create the mapping with your queue.

Reading from Topics

In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:

  1. Parameter servicebus.type which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC here and we need to pass it.

  2. Parameter subscription.name which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.

Make sure your subscription exists otherwise you will get a similar error to this

Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity 'streamreactor:Topic:my-topic|lenses' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions.

Create the subscription per topic in the Azure Portal.

connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-topic PROPERTIES('servicebus.type'='TOPIC','subscription.name'='subscription1');

This is sufficient to enable you to create the mapping with your topic.

Option Reference

KCQL Properties

Please find below all the necessary KCQL properties:

Name
Description
Type
Default Value

servicebus.type

Specifies Service Bus type: QUEUE or TOPIC

string

subscription.name

Specifies subscription name if Service Bus type is TOPIC

string

Configuration parameters

Please find below all the relevant configuration parameters:

Name
Description
Type
Default Value

connect.servicebus.connection.string

Specifies the Connection String to connect to Service Bus

string

connect.servicebus.kcql

Comma-separated output KCQL queries

string

connect.servicebus.source.task.records.queue.size

Specifies the Queue size between Service Bus Receivers and Kafka

int

5000

connect.servicebus.source.sleep.on.empty.poll.ms

The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.

long

250

connect.servicebus.source.complete.retries.max

The maximum number of retries to complete a message.

int

3

connect.servicebus.source.complete.retries.min.backoff.ms

The minimum duration in milliseconds for the first backoff

long

1000

connect.servicebus.source.prefetch.count

The number of messages to prefetch from the Azure Service Bus.

int

2000

Cassandra

This page describes the usage of the Stream Reactor Cassandra Source Connector.

Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.

Connector Class

io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector

Example

For more examples see the tutorials.

name=cassandra
connector.class=io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=cassandra

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

However, you can not route the same source to different topics, for this use a separate connector instance.

The following KCQL is supported:

INSERT INTO <your-topic>
SELECT FIELD,...
FROM <your-cassandra-table>
[PK FIELD]
[WITHFORMAT JSON]
[INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN|DSESEARCHTIMESTAMP]
[WITHKEY(<your-key-field>)]

Examples:

-- Select all columns from table orders and insert into a topic
-- called orders-topic, use column created to track new rows.
-- Incremental mode set to TIMEUUID
INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID

-- Select created, product, price from table orders and insert
-- into a topic called orders-topic, use column created to track new rows.
INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.

Keyed JSON Format

The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

INSERT INTO <topic>
SELECT <fields>
FROM <column_family>
PK <PK_field>
WITHFORMAT JSON
WITHUNWRAP INCREMENTALMODE=<mode>
WITHKEY(<key_field>)

Multiple key fields are supported using a delimiter:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.

Keying is only supported in conjunction with the WITHFORMAT JSON clause

Incremental mode

This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:

  1. TIMESTAMP

  2. TIMEUUID

  3. TOKEN

  4. DSESEARCHTIMESTAMP

If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.

You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.

DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.

INSERT INTO <topic>
SELECT a, b, c, d
FROM keyspace.table
WHERE solr_query= 'pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]}'
INCREMENTALMODE=DSESEARCHTIMESTAMP

Bulk Mode

The connector constantly loads the entire table.

Controlling throughput

The connector can be configured to:

  • Start from a particular offset - connect.cassandra.initial.offset

  • Increase or decrease the poll interval - connect.cassandra.import.poll.interval

  • Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration

For a more detailed explanation of how to use Cassandra to Kafka options.

Source Data Type Mapping

The following CQL data types are supported:

CQL Type
Connect Data Type

TimeUUID

Optional String

UUID

Optional String

Inet

Optional String

Ascii

Optional String

Text

Optional String

Timestamp

Optional String

Date

Optional String

Tuple

Optional String

UDT

Optional String

Boolean

Optional Boolean

TinyInt

Optional Int8

SmallInt

Optional Int16

Int

Optional Int32

Decimal

Optional String

Float

Optional Float32

Counter

Optional Int64

BigInt

Optional Int64

VarInt

Optional Int64

Double

Optional Int64

Time

Optional Int64

Blob

Optional Bytes

Map

Optional [String -> MAP]

List

Optional [String -> ARRAY]

Set

Optional [String -> ARRAY]

Option Reference

Name
Description
Type
Default Value

connect.cassandra.contact.points

Initial contact point host for Cassandra including port.

string

localhost

connect.cassandra.port

Cassandra native port.

int

9042

connect.cassandra.key.space

Keyspace to write to.

string

connect.cassandra.username

Username to connect to Cassandra with.

string

connect.cassandra.password

Password for the username to connect to Cassandra with.

password

connect.cassandra.ssl.enabled

Secure Cassandra driver connection via SSL.

boolean

false

connect.cassandra.trust.store.path

Path to the client Trust Store.

string

connect.cassandra.trust.store.password

Password for the client Trust Store.

password

connect.cassandra.trust.store.type

Type of the Trust Store, defaults to JKS

string

JKS

connect.cassandra.key.store.type

Type of the Key Store, defauts to JKS

string

JKS

connect.cassandra.ssl.client.cert.auth

Enable client certification authentication by Cassandra. Requires KeyStore options to be set.

boolean

false

connect.cassandra.key.store.path

Path to the client Key Store.

string

connect.cassandra.key.store.password

Password for the client Key Store

password

connect.cassandra.consistency.level

Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.

string

connect.cassandra.fetch.size

The number of records the Cassandra driver will return at once.

int

5000

connect.cassandra.load.balancing.policy

Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN

string

TOKEN_AWARE

connect.cassandra.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

string

THROW

connect.cassandra.max.retries

The maximum number of times to try the write again.

int

20

connect.cassandra.retry.interval

The time in milliseconds between retries.

int

60000

connect.cassandra.task.buffer.size

The size of the queue as read writes to.

int

10000

connect.cassandra.assigned.tables

The tables a task has been assigned.

string

connect.cassandra.batch.size

The number of records the source task should drain from the reader queue.

int

100

connect.cassandra.import.poll.interval

The polling interval between queries against tables for bulk mode.

long

1000

connect.cassandra.time.slice.ms

The range of time in milliseconds the source task the timestamp/timeuuid will use for query

long

10000

connect.cassandra.import.allow.filtering

Enable ALLOW FILTERING in incremental selects.

boolean

true

connect.cassandra.slice.duration

Duration to query for in target Cassandra table. Used to restrict query timestamp span

long

10000

connect.cassandra.slice.delay.ms

The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available

long

30000

connect.cassandra.initial.offset

The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z

string

1900-01-01 00:00:00.0000000Z

connect.cassandra.mapping.collection.to.json

Mapping columns with type Map, List and Set like json

boolean

true

connect.cassandra.kcql

KCQL expression describing field selection and routes.

string

\

GCP PubSub

This page describes the usage of the Stream Reactor Google PubSub Source Connector.

The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).

Connector Class

io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector

Example

For more examples see the tutorials.

name=GcpPubSubSourceDemo
connector.class=io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector
topics=kafka_topic_to_write_to
tasks.max=1
connect.pubsub.gcp.auth.mode=File
connect.pubsub.gcp.file=/path/to/gcp-service-account-key.json
connect.pubsub.gcp.project.id=gcp-project-id
connect.pubsub.kcql=insert into `kafka_topic_to_write_to` select * from `gcp-subscription-id`

KCQL Support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

INSERT INTO kafka-topic
SELECT *
FROM subscriptionId
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]

Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix

The connector does not support multiple KCQL statements that reference the same source location; to use multiple statements, configure each one in a separate connector instance.

Source Subscription ID and Target Topic

The source and target of the data are specified via the INSERT INTO... SELECT * FROM clause. The connector will write all the records to the given topic, from the given subscription:

INSERT INTO my-topic SELECT * FROM subscriptionId;

Properties

The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').

Properties can be defined in any order.

The following properties are supported:

Name
Description
Type
Default Value

batch.size

The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping).

int

1000

cache.ttl

The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message.

long

1 hour

queue.max

Data is loaded into a queue asynchronously so that it stands ready when the poll call is activated. Control the maximum number of records to hold in the queue per KCQL mapping.

int

10000

Auth Mode

The connector offers three distinct authentication modes:

  • Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.

  • File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.

  • Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.

connect.pubsub.gcp.auth.mode=Default

When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

Here's an example configuration for the "Credentials" mode:

connect.pubsub.gcp.auth.mode=Credentials
connect.pubsub.gcp.credentials=$GCP_CREDENTIALS
connect.pubsub.gcp.project.id=$GCP_PROJECT_ID

And here is an example configuration using the "File" mode:

connect.pubsub.gcp.auth.mode=File
connect.pubsub.gcp.file=/home/secure-stuff/gcp-read-credential.txt

Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.

Output Modes

Two modes are available: Default Mode and Compatibility Mode.

Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.

You can choose whichever suits your requirements.

Default Mode

Configuration

connect.pubsub.output.mode=DEFAULT

Record Schema

Each Pub/Sub message is transformed into a single Kafka record, structured as follows:

  • Kafka Key: A String of the Pub/Sub MessageID.

  • Kafka Value: The Pub/Sub message value as BYTES.

  • Kafka Headers: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.

Key Schema

The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.

Value Schema

The Kafka Value is mapped from the body of the Pub/Sub message.

Headers Schema

The Kafka Headers include:

  • PublishTimestamp: Long value representing the time when the Pub/Sub message was published, in seconds.

  • GCPProjectID: The GCP Project

  • PubSubTopicID: The Pub/Sub Topic ID.

  • PubSubSubscriptionID: The Pub/Sub Subscription ID.

  • All Pub/Sub message attributes: Each attribute from the Pub/Sub message is mapped as a separate header.

Compatibility Mode

Configuration

connect.pubsub.output.mode=COMPATIBILITY

Record Schema

Each Pub/Sub message is transformed into a single Kafka record, structured as follows:

  • Kafka Key: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.

  • Kafka Value: Contains the message data and attributes from the Pub/Sub message.

Key Schema

The Key is a structure with these fields:

Field Name
Schema Type
Description

ProjectId

String

The Pub/Sub project containing the topic from which messages are polled.

TopicId

String

The Pub/Sub topic containing the messages.

SubscriptionId

String

The Pub/Sub subscription of the Pub/Sub topic.

MessageId

String

A unique ID for a Pub/Sub message

Value Schema

The Value is a structure with these fields:

Field Name
Schema Type
Description

MessageData

Optional String

The body of the Pub/Sub message.

AttributeMap

Optional String

The attribute map associated with the Pub/Sub message.

Option Reference

Name
Description
Type
Available Values
Default Value

connect.pubsub.gcp.auth.mode

Specifies the authentication mode for connecting to GCP.

string

Credentials, File or Default

Default

connect.pubsub.gcp.credentials

For “auth.mode” credentials: GCP Authentication credentials string.

string

(Empty)

connect.pubsub.gcp.file

For “auth.mode” file: Local file path for file containing GCP authentication credentials.

string

(Empty)

connect.pubsub.gcp.project.id

GCP Project ID.

string

(Empty)

connect.pubsub.kcql

Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour

string

kcql configuration

connect.pubsub.output.mode

Output mode. Please see Output Modes documentation below.

Default or Compatibility

Default

GCP Storage

This page describes the usage of the Stream Reactor GCP Storage Source Connector.

Connector Class

io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector

Example

For more examples see the tutorials.

name=gcp-storageSourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=1
connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID

KCQL Support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

However, you can not route the same source to different topics, for this use a separate connector instance.

The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

INSERT INTO $kafka-topic
SELECT *
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]

Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix

Source Bucket & Path

The GCP Storage source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.

The FROM clause format is:

FROM [bucketname]:pathprefix
//my-bucket-called-pears:my-folder-called-apples 

If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.

connect.gcpstorage.source.partition.extractor.regex=none

connect.gcpstorage.source.ordering.type=LastModified

To load in alpha numeric order set the ordering type to AlphaNumeric.

Target Bucket & Path

The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:

INSERT INTO my-apples-topic SELECT * FROM  my-bucket-called-pears:my-folder-called-apples 

GCP Storage object formats

The connector supports a range of storage formats, each with its own distinct functionality:

  • JSON: The connector will read objects containing JSON content, each line representing a distinct record.

  • Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.

  • Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.

  • Text: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.

  • Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.

Use the STOREAS clause to configure the storage format. The following options are available:

STOREAS `JSON`
STOREAS `Avro`
STOREAS `Parquet`
STOREAS `Text`
STOREAS `CSV`
STOREAS `CSV_WithHeaders`
STOREAS `Bytes`

Text Processing

When using Text storage, the connector provides additional configuration options to finely control how text content is processed.

Regex

In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:

connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')

Start-End line

In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:

connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')

To trim the start and end lines, set the read.text.trim property to true:

connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')

Start-End tag

In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:

 connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')

Storage output matrix

Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

S3 Storage Format
Kafka Output Format
Restore or replicate cluster
Analytics
Sink Converter
Source Converter

JSON

STRING

Same,Other

Yes, No

StringConverter

StringConverter

AVRO,Parquet

STRING

Same,Other

Yes

StringConverter

StringConverter

AVRO,Parquet

STRING

Same,Other

No

ByteArrayConverter

ByteArrayConverter

JSON

JSON

Same,Other

Yes

JsonConverter

StringConverter

JSON

JSON

Same,Other

No

StringConverter

StringConverter

AVRO,Parquet

JSON

Same,Other

Yes,No

JsonConverter

JsonConverter or Avro Converter( Glue, Confluent)

AVRO,Parquet, JSON

BYTES

Same,Other

Yes,No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

AVRO

Same

Yes

Avro Converter( Glue, Confluent)

Avro Converter( Glue, Confluent)

AVRO,Parquet

AVRO

Same

No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

AVRO

Other

Yes,No

Avro Converter( Glue, Confluent)

Avro Converter( Glue, Confluent)

AVRO,Parquet

Protobuf

Same

Yes

Protobuf Converter( Glue, Confluent)

Protobuf Converter( Glue, Confluent)

AVRO,Parquet

Protobuf

Same

No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

Protobuf

Other

Yes,No

Protobuf Converter( Glue, Confluent)

Protobuf Converter( Glue, Confluent)

AVRO,Parquet, JSON

Other

Same, Other

Yes,No

ByteArrayConverter

ByteArrayConverter

Projections

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.

Ordering

The sink employs zero-padding in object name[^3]s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of objects.

When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical object name order.

In such cases, to process objects in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.

If using LastModified sorting, ensure objects do not arrive late, or use a post-processing step to handle them.

Throttling

To limit the number of object names the source reads from GCS in a single poll. The default value, if not specified, is 1000:

BATCH = 100

To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.

LIMIT 10000

Object Extension Filtering

The GCP Storage Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes.

Excluding Object Extensions

The connect.gcpstorage.source.extension.excludes property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt and .csv objects, you would set this property as follows:

connect.gcpstorage.source.extension.excludes=txt,csv

Including Object Extensions

The connect.gcpstorage.source.extension.includes property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json and .xml objects, you would set this property as follows:

connect.gcpstorage.source.extension.includes=json,xml

Note: If both connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.

Post-Processing Options

Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving objects to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.

Use Cases for Post-Processing Options

  1. Deleting objects After Processing

    For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.

    Example:

    INSERT INTO `my-topic`
    SELECT * FROM `my-gcp-storage-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`DELETE`
    )

    Result: objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.

  2. Moving objects to an Archive Bucket

    To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.

    Example:

    INSERT INTO `my-topic`
    SELECT * FROM `my-gcp-storage-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`MOVE`,
        'post.process.action.bucket'=`archive-bucket`,
        'post.process.action.prefix'=`processed/`
    )

    Result: objects are transferred to an archive-bucket, stored with an updated path that includes the processed/ prefix, maintaining an organized archive structure.

Properties

The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

Name
Description
Type
Available Values

read.text.mode

Controls how Text content is read

Enum

Regex, StartEndTag, StartEndLine

read.text.regex

Regular Expression for Text Reading (if applicable)

String

read.text.start.tag

Start Tag for Text Reading (if applicable)

String

read.text.end.tag

End Tag for Text Reading (if applicable)

String

read.text.buffer.size

Text Buffer Size (for optimization)

Int

read.text.start.line

Start Line for Text Reading (if applicable)

String

read.text.end.line

End Line for Text Reading (if applicable)

String

read.text.trim

Trim Text During Reading

Boolean

store.envelope

Messages are stored as “Envelope”

Boolean

post.process.action

Defines the action to perform on source objects after successful processing.

Enum

DELETE or MOVE

post.process.action.bucket

Specifies the target bucket for the MOVE action (required for MOVE).

String

post.process.action.prefix

Specifies a new prefix for the object’s location when using the MOVE action (required for MOVE).

String

Authentication

The connector offers two distinct authentication modes:

  • Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.

  • File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.

  • Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.

connect.gcpstorage.gcp.auth.mode=Default

When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

Here’s an example configuration for the “Credentials” mode:

connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID

And here is an example configuration using the “File” mode:

connect.gcpstorage.gcp.auth.mode=File
connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-read-credential.txt

Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.

Backup and Restore

When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:

{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Value, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 0,
    "partition": 0,
    "timestamp": 0,
    "topic": "topic"
  }
}

When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.

Partition Extraction

When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.

To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type property, which supports two options:

  • hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.

  • regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:

(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$

Option Reference

Name
Description
Type
Available Values
Default Value

connect.gcpstorage.gcp.auth.mode

Specifies the authentication mode for connecting to GCP.

string

"Credentials", "File" or "Default"

"Default"

connect.gcpstorage.gcp.credentials

For "auth.mode" credentials: GCP Authentication credentials string.

string

(Empty)

connect.gcpstorage.gcp.file

For "auth.mode" file: Local file path for file containing GCP authentication credentials.

string

(Empty)

connect.gcpstorage.gcp.project.id

GCP Project ID.

string

(Empty)

connect.gcpstorage.gcp.quota.project.id

GCP Quota Project ID.

string

(Empty)

connect.gcpstorage.endpoint

Endpoint for GCP Storage.

string

connect.gcpstorage.error.policy

Defines the error handling policy when errors occur during data transfer to or from GCP Storage.

string

"NOOP," "THROW," "RETRY"

"THROW"

connect.gcpstorage.max.retries

Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.

int

20

connect.gcpstorage.retry.interval

Specifies the interval (in milliseconds) between retry attempts by the connector.

int

60000

connect.gcpstorage.http.max.retries

Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.

long

5

connect.gcpstorage.http.retry.interval

Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.

long

50

connect.gcpstorage.kcql

Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour

string

[kcql configuration]({{< relref "#kcql-support" >}})

connect.gcpstorage.source.extension.excludes

A comma-separated list of object extensions to exclude from the source object search.

string

[object extension filtering]({{< relref "#object-extension-filtering" >}})

connect.gcpstorage.source.extension.includes

A comma-separated list of object extensions to include in the source object search.

string

[object extension filtering]({{< relref "#object-extension-filtering" >}})

connect.gcpstorage.source.partition.extractor.type

Type of Partition Extractor (Hierarchical or Regex)

string

hierarchical, regex

connect.gcpstorage.source.partition.extractor.regex

Regex Pattern for Partition Extraction (if applicable)

string

connect.gcpstorage.source.partition.search.continuous

If set to true the connector will continuously search for new partitions.

boolean

true, false

true

connect.gcpstorage.source.partition.search.interval

The interval in milliseconds between searching for new partitions.

long

300000

connect.gcpstorage.source.partition.search.excludes

A comma-separated list of paths to exclude from the partition search.

string

".indexes"

connect.gcpstorage.source.partition.search.recurse.levels

Controls how many levels deep to recurse when searching for new partitions

int

0

connect.gcpstorage.ordering,type

Type of ordering for the GCS object keys to ensure the processing order.

string

AlphaNumeric, LastModified

AlphaNumeric

connect.gcpstorage.source.empty.results.backoff.initial.delay

Initial delay before retrying when no results are found.

long

1000 Milliseconds

connect.gcpstorage.source.empty.results.backoff.max.delay

Maximum delay before retrying when no results are found.

long

10000 Milliseconds

connect.gcpstorage.source.empty.results.backoff.multiplier

Multiplier to apply to the delay when retrying when no results are found.

double

2.0 Multiplier (x)

connect.gcpstorage.source.write.watermark.header

Write the record with kafka headers including details of the source and line number of the file.

boolean

true, false

false

FTP

This page describes the usage of the Stream Reactor FTP Source Connector.

Provide the remote directories and on specified intervals, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting them into Kafka.

Connector Class

io.lenses.streamreactor.connect.ftp.source.FtpSourceConnector

Example

For more examples see the tutorials.

name=ftp-source
connector.class=io.lenses.streamreactor.connect.ftp.source.FtpSourceConnector
tasks.max=1

#server settings
connect.ftp.address=localhost:21
connect.ftp.user=ftp
connect.ftp.password=ftp

#refresh rate, every minute
connect.ftp.refresh=PT1M

#ignore files older than 14 days.
connect.ftp.file.maxage=P14D

#monitor /forecasts/weather/ and /logs/ for appends to files.
#any updates go to the topics `weather` and `error-logs` respectively.
connect.ftp.monitor.tail=/forecasts/weather/:weather,/logs/:error-logs

#keep an eye on /statuses/, files are retrieved as a whole and sent to topic `status`
connect.ftp.monitor.update=/statuses/:status

#keystyle controls the format of the key and can be string or struct.
#string only provides the file name
#struct provides a structure with the filename and offset
connect.ftp.keystyle=struct

Data types

Each Kafka record represents a file and has the following types.

  • The format of the keys is configurable through connect.ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with the name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.

  • The values of the records contain the body of the file as bytes.

Tailing Versus Update as a Whole

The following rules are used.

Tailed files are only allowed to grow. Bytes that have been appended to it since the last inspection are yielded. Preceding bytes are not allowed to change;

Updated files can grow, shrink and change anywhere. The entire contents are yielded.

Data converters

Instead of dumping whole file bodies (and the danger of exceeding Kafka’s message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:

package io.lenses.streamreactor.connect.ftp

trait SourceRecordConverter extends Configurable {
    def convert(in:SourceRecord) : java.util.List[SourceRecord]
}

The default object that is used is a pass-through converter, an instance of:

class NopSourceRecordConverter extends SourceRecordConverter{
    override def configure(props: util.Map[String, _]): Unit = {}
    override def convert(in: SourceRecord): util.List[SourceRecord] = Seq(in).asJava
}

To override it, create your own implementation of SourceRecordConverter and place the jar in the plugin.path.

connect.ftp.sourcerecordconverter=your.name.space.YourConverter

To learn more examples of using the FTP Kafka connector read this blog.

Option Reference

Name
Description
Type
Default Value

connect.ftp.address

host[:port] of the ftp server

string

connect.ftp.user

Username to connect with

string

connect.ftp.password

Password to connect with

string

connect.ftp.refresh

iso8601 duration that the server is polled

string

connect.ftp.file.maxage

iso8601 duration for how old files can be

string

connect.ftp.keystyle

SourceRecord keystyle, string or struct

string

connect.ftp.protocol

Protocol to use, FTP or FTPS

string

ftp

connect.ftp.timeout

Ftp connection timeout in milliseconds

int

30000

connect.ftp.filter

Regular expression to use when selecting files for processing

string

.*

connect.ftp.monitor.tail

Comma separated lists of path:destinationtopic; tail of file to tracked

string

connect.ftp.monitor.update

Comma separated lists of path:destinationtopic; whole file is tracked

string

connect.ftp.monitor.slicesize

File slice size in bytes

int

-1

connect.ftp.fileconverter

File converter class

string

io.lenses.streamreactor.connect.ftp.source.SimpleFileConverter

connect.ftp.sourcerecordconverter

Source record converter class

string

io.lenses.streamreactor.connect.ftp.source.NopSourceRecordConverter

connect.ftp.max.poll.records

Max number of records returned per poll

int

10000

JMS

This page describes the usage of the Stream Reactor JMS Source Connector.

A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.

The connector uses the standard JMS protocols and has been tested against ActiveMQ.

The connector allows for the JMS initial.context.factory and connection.factory to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path of the connector.

Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.

The schema of the messages is fixed and can be found under Data Types unless a converter is used.

You must provide the JMS implementation jars for your JMS service.

Connector Class

io.lenses.streamreactor.connect.jms.source.JMSSourceConnector

Example

For more examples see the tutorials.

name=jms-source
connector.class=io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://activemq:61616
connect.jms.connection.factory=ConnectionFactory

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

However, you can not route the same source to different topics, for this use a separate connector instance.

The following KCQL is supported:

INSERT INTO kafka_topic
SELECT *
FROM jms_destination
WITHTYPE [TOPIC|QUEUE]
[WITHCONVERTER=`myclass`]

The selection of fields from the JMS message is not supported.

Examples:

-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE

-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA
SELECT * FROM jms_topic
WITHTYPE TOPIC
WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.AvroConverter`

Destination types

The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.

Message Converters

The connector supports converters to handle different message payload formats in the source topic or queue.

If no converter is provided the JMS message is converter to a Kafka Struct representation.

See source record converters.

Data type conversion

Name
Type

message_timestamp

Optional int64

correlation_id

Optional string

redelivered

Optional boolean

reply_to

Optional string

destination

Optional string

message_id

Optional string

mode

Optional int32

type

Optional string

priority

Optional int32

bytes_payload

Optional bytes

properties

Map of string

Option Reference

Name
Description
Type
Default Value

connect.jms.url

Provides the JMS broker url

string

connect.jms.initial.context.factory

Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

string

connect.jms.connection.factory

Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory

string

ConnectionFactory

connect.jms.kcql

connect.jms.kcql

string

connect.jms.subscription.name

subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics

string

connect.jms.password

Provides the password for the JMS connection

password

connect.jms.username

Provides the user for the JMS connection

string

connect.jms.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.jms.retry.interval

The time in milliseconds between retries.

int

60000

connect.jms.max.retries

The maximum number of times to try the write again.

int

20

connect.jms.destination.selector

Selector to use for destination lookup. Either CDI or JNDI.

string

CDI

connect.jms.initial.context.extra.params

List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp

list

[]

connect.jms.batch.size

The number of records to poll for on the target JMS destination in each Connect poll.

int

100

connect.jms.polling.timeout

Provides the timeout to poll incoming messages

long

1000

connect.jms.source.default.converter

Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter

string

connect.jms.converter.throw.on.error

If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

boolean

false

connect.converter.avro.schemas

If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE

string

connect.jms.headers

Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"

string

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

connect.jms.evict.interval.minutes

Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.

int

10

connect.jms.evict.threshold.minutes

The number of minutes after which an uncommitted entry becomes evictable from the connector cache.

int

10

connect.jms.scale.type

How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max

MQTT

This page describes the usage of the Stream Reactor MQTT Source Connector.

A Kafka Connect source connector to read events from MQTT and push them to Kafka.

Connector Class

io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector

Example

For more examples, see the tutorials.

name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.client.id=dm_source_id
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.service.quality=1

KCQL support

You can specify multiple KCQL statements separated by ;

However, you can not route the same source to different topics, for this use a separate connector instance.

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-mqtt-topic>
[WITHCONVERTER=`myclass`]

The selection of fields from the JMS message is not supported.

Examples:

-- Insert mode, select all fields from topicA
-- and write to topic topic with converter myclass
INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]

-- wildcard
INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]

Keyed JSON format

To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data, the connector supports WITHKEY in the KCQL syntax.

Multiple key fields are supported using a delimiter:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

The resulting Kafka record key content will be the string concatenation of the values of the fields. Optionally, the delimiter can be set via the KEYDELIMITER keyword.

Shared and Wildcard Subscriptions

The connector supports wildcard and shared subscriptions, but the KCQL command must be placed inside single quotes.

-- wildcard
INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA/+/sensors WITHCONVERTER=`myclass`

Dynamic target topics

When using wildcard subscriptions, you can dynamically route messages to a Kafka topic with the same name as the MQTT topic by using `$`in the KCQL target statement.

You must use a wildcard or a shared subscription format.

Kafka does not support / . The result topic names will have/replaced by _. For example:

/mqttSourceTopic/A/test would become mqttSourceTopic_A_test.

INSERT INTO `$` SELECT * FROM /mqttTopicA/+/sensors

Message converters

The connector supports converters to handle different message payload formats in the source topic. See source record converters.

Option Reference

Name
Description
Type
Default Value

connect.mqtt.hosts

Contains the MQTT connection end points.

string

connect.mqtt.username

Contains the Mqtt connection user name

string

connect.mqtt.password

Contains the Mqtt connection password

password

connect.mqtt.service.quality

Specifies the Mqtt quality of service

int

connect.mqtt.timeout

Provides the time interval to establish the mqtt connection

int

3000

connect.mqtt.clean

connect.mqtt.clean

boolean

true

connect.mqtt.keep.alive

The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.

int

5000

connect.mqtt.client.id

Contains the Mqtt session client id

string

connect.mqtt.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.mqtt.retry.interval

The time in milliseconds between retries.

int

60000

connect.mqtt.max.retries

The maximum number of times to try the write again.

int

20

connect.mqtt.retained.messages

Specifies the Mqtt retained flag.

boolean

false

connect.mqtt.converter.throw.on.error

If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

boolean

false

connect.converter.avro.schemas

If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter

string

connect.mqtt.log.message

Logs received MQTT messages

boolean

false

connect.mqtt.kcql

Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics

string

connect.mqtt.polling.timeout

Provides the timeout to poll incoming messages

int

1000

connect.mqtt.share.replicate

Replicate the shared subscriptions to all tasks instead of distributing them

boolean

false

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

connect.mqtt.ssl.ca.cert

Provides the path to the CA certificate file to use with the Mqtt connection

string

connect.mqtt.ssl.cert

Provides the path to the certificate file to use with the Mqtt connection

string

connect.mqtt.ssl.key

Certificate private [config] key file path.

string

connect.mqtt.process.duplicates

Process duplicate messages

boolean

false