All pages
Powered by GitBook
1 of 10

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Sources

This page details the configuration options for the Stream Reactor Kafka Connect source connectors.

Source connectors read data from external systems and write to Kafka.

Cover

AWS S3

Load data from AWS S3 including restoring topics.

Cover

Azure Event Hubs

Load data from Azure Event Hubs into Kafka topics.

Cover

Azure Service Bus

Load data from Azure Service Bus into Kafka topics.

Cover

Cassandra

Load data from Cassandra into Kafka topics.

Cover

GCP PubSub

Load data from GCP PubSub into Kafka topics.

Cover

GCP Storage

Load data from GCP Storage including restoring topics.

Cover

FTP

Load data from files on FTP servers into Kafka topics.

Cover

JMS

Load data from JMS topics and queues into Kafka topics.

Cover

MQTT

Load data from MQTT into Kafka topics.

Azure Event Hubs

This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.

A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.

In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info here.

Connector Class

Example

For more examples see the .

Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

The following KCQL is supported:

The selection of fields from the Event Hubs message is not supported.

Payload support

As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.

Authentication

You can connect to Azure EventHubs passing specific JAAS parameters in configuration.

Learn more about different methods of connecting to Event Hubs on . The only caveat is to add connector-specific prefix like in example above. See for more info.

Fine-tunning the Kafka Connector

The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings and when user specifies a property with it, it will be automatically passed to the Consumer.

User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.

It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records is passed directly to it)

There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:

  • client.id - Connector sets it by itself

  • group.id - Connector sets it by itself

  • key.deserializer - Connector transitions bytes 1-to-1

Option Reference

Name
Description
Type
Default Value
io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector
value.deserializer - Connector transitions bytes 1-to-1
  • enable.auto.commit - connector automatically sets it to false and checks what offsets are committed in output topic instead

  • connect.eventhubs.kcql

    Comma-separated output KCQL queries

    string

    connect.eventhubs.source.connection.settings.bootstrap.servers

    Specifies the Event Hubs server location.

    string

    connect.eventhubs.source.close.timeout

    Amount of time (in seconds) for Consumer to close.

    int

    30

    connect.eventhubs.source.default.offset

    Specifies whether by default we should consume from earliest (default) or latest offset.

    string

    tutorials
    Azure website
    Fine-tunning the Kafka Connector

    earliest

    name=AzureEventHubsSourceConnector
    connector.class=io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector
    tasks.max=1
    connect.eventhubs.kcql=INSERT INTO azureoutput SELECT * FROM inputhub;
    connect.eventhubs.source.connection.settings.bootstrap.servers=MYNAMESPACE.servicebus.windows.net:9093
    connect.eventhubs.source.connection.settings.sasl.mechanism=PLAIN
    connect.eventhubs.source.connection.settings.security.protocol=SASL_SSL
    connect.eventhubs.source.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING;EntityPath=inputhub";
    INSERT INTO <your-kafka-topic>
    SELECT *
    FROM <your-event-hub>;
    connect.eventhubs.connection.settings.bootstrap.servers=NAMESPACENAME.servicebus.windows.net:9093
    connect.eventhubs.connection.settings.sasl.mechanism=PLAIN
    connect.eventhubs.connection.settings.security.protocol=SASL_SSL
    connect.eventhubs.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    connect.eventhubs.connection.settings.max.poll.records = 100

    Azure Service Bus

    This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.

    This Kafka connector is designed to effortlessly ingest records from Azure Service Bus into your Kafka cluster. It leverages Microsoft Azure API to seamlessly transfer data from Service Buses, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Service Bus once Connector verifies it was successfully committed to designated Kafka topic. Azure Service Bus Source Connector supports both types of Service Buses: Queues and Topics.

    Connector Class

    Full Config Example

    For more examples see the .

    The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed (link to option reference??). Feel free to tweak the configuration to your requirements.

    KCQL support

    You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

    The following KCQL is supported:

    It allows you to map Service Bus of name <your-service-bus> to Kafka topic of name <your-kafka-topic> using the PROPERTIES specified.

    The selection of fields from the Service Bus message is not supported.

    Payload support

    Azure Service Bus Connector follows specific pattern (Schema) of messages. Please look below for the format of the data transferred to Kafka Topics specified in the KCQL config.

    Key Format (Schema)

    Field Name
    Schema Type
    Description

    Payload Format (Schema)

    Authentication

    You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.

    Learn more about different methods of connecting to Service Bus on the .

    Queues & Topics

    The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check to learn more about those mechanisms.

    Reading from Queues

    In order to be reading from the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE here and we need to pass it.

    This is sufficient to enable you to create the mapping with your queue.

    Reading from Topics

    In order to be reading from the topic there are two additional parameters that you need to pass with your KCQL mapping in the PROPERTIES part:

    1. Parameter servicebus.type which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC here and we need to pass it.

    2. Parameter subscription.name which takes the (case-sensitive) value of a subscription name that you've created for this topic for the connector to use. Please use Azure Portal to create one.

    Make sure your subscription exists otherwise you will get a similar error to this

    Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity 'streamreactor:Topic:my-topic|lenses' could not be found. To know more visit .

    Create the subscription per topic in the Azure Portal.

    This is sufficient to enable you to create the mapping with your topic.

    Option Reference

    KCQL Properties

    Please find below all the necessary KCQL properties:

    Name
    Description
    Type
    Default Value

    Configuration parameters

    Please find below all the relevant configuration parameters:

    Name
    Description
    Type
    Default Value
    io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector

    Optional String

    The correlation identifier.

    messageProperties

    Optional String

    The map of user application properties of this message.

    partitionKey

    Optional String

    The partition key for sending a message to a partitioned entity.

    replyTo

    Optional String

    The address of an entity to send replies to.

    replyToSessionId

    Optional String

    The session identifier augmenting the ReplyTo address.

    deadLetterSource

    Optional String

    The name of the queue or subscription that this message was enqueued on, before it was deadlettered.

    timeToLive

    int64

    The duration before this message expires.

    lockedUntilUtc

    Optional int64

    The time when the lock of this message expires.

    sequenceNumber

    Optional int64

    The unique number assigned to a message by Azure Service Bus.

    sessionId

    Optional String

    The session identifier for a session-aware entity.

    lockToken

    Optional String

    The lock token for the current message.

    messageBody

    Optional bytes

    The body of this message as a byte array.

    getTo

    Optional String

    The “to” address.

    connect.servicebus.source.sleep.on.empty.poll.ms

    The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.

    long

    250

    connect.servicebus.source.complete.retries.max

    The maximum number of retries to complete a message.

    int

    3

    connect.servicebus.source.complete.retries.min.backoff.ms

    The minimum duration in milliseconds for the first backoff

    long

    1000

    connect.servicebus.source.prefetch.count

    The number of messages to prefetch from the Azure Service Bus.

    int

    2000

    MessageId

    String

    The message identifier that uniquely identifies the message and its payload.

    Field Name

    Schema Type

    Description

    deliveryCount

    int64

    The number of the times this message was delivered to clients.

    enqueuedTimeUtc

    int64

    The time at which this message was enqueued in Azure Service Bus.

    contentType

    Optional String

    The content type of this message.

    label

    Optional String

    The application specific message label.

    servicebus.type

    Specifies Service Bus type: QUEUE or TOPIC

    string

    subscription.name

    Specifies subscription name if Service Bus type is TOPIC

    string

    connect.servicebus.connection.string

    Specifies the Connection String to connect to Service Bus

    string

    connect.servicebus.kcql

    Comma-separated output KCQL queries

    string

    connect.servicebus.source.task.records.queue.size

    Specifies the Queue size between Service Bus Receivers and Kafka

    int

    tutorials
    Azure Website
    Azure Service Bus documentation
    https://aka.ms/sbResourceMgrExceptions

    correlationId

    5000

    connector.class=io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector
    name=AzureServiceBusSourceConnector
    tasks.max=1
    connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
    connect.servicebus.kcql=INSERT INTO output-topic SELECT * FROM servicebus-queue PROPERTIES('servicebus.type'='QUEUE');
    INSERT INTO <your-kafka-topic>
    SELECT *
    FROM <your-service-bus> 
    PROPERTIES(...); 
    connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=
    connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-queue PROPERTIES('servicebus.type'='QUEUE');
    connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-topic PROPERTIES('servicebus.type'='TOPIC','subscription.name'='subscription1');

    GCP PubSub

    This page describes the usage of the Stream Reactor Google PubSub Source Connector.

    The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).

    Connector Class

    JMS

    This page describes the usage of the Stream Reactor JMS Source Connector.

    A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.

    The connector uses the standard JMS protocols and has been tested against ActiveMQ.

    The connector allows for the JMS initial.context.factory and connection.factory to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path of the connector.

    Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then the JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.

    The schema of the messages is fixed and can be found under Data Types unless a converter is used.

    MQTT

    This page describes the usage of the Stream Reactor MQTT Source Connector.

    A Kafka Connect source connector to read events from MQTT and push them to Kafka.

    Connector Class

    Example

    Example

    For more examples see the tutorials.

    KCQL Support

    You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

    The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

    The connector does not support multiple KCQL statements that reference the same source location; to use multiple statements, configure each one in a separate connector instance.

    Source Subscription ID and Target Topic

    The source and target of the data are specified via the INSERT INTO... SELECT * FROM clause. The connector will write all the records to the given topic, from the given subscription:

    Properties

    The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ',').

    Properties can be defined in any order.

    The following properties are supported:

    Name
    Description
    Type
    Default Value

    batch.size

    The maximum number of messages the connector will retrieve and process at one time per polling request (per KCQL mapping).

    int

    1000

    cache.ttl

    The maximum amount of time (in milliseconds) to store message data to allow acknowledgement of a message.

    long

    1 hour

    queue.max

    Data is loaded into a queue asynchronously so that it stands ready when the poll call is activated. Control the maximum number of records to hold in the queue per KCQL mapping.

    int

    Auth Mode

    The connector offers three distinct authentication modes:

    • Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.

    • File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.

    • Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

    The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration.

    When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

    Here's an example configuration for the "Credentials" mode:

    And here is an example configuration using the "File" mode:

    Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

    For enhanced security and flexibility when using either the "Credentials" mode, it is highly advisable to utilize Connect Secret Providers.

    Output Modes

    Two modes are available: Default Mode and Compatibility Mode.

    Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.

    You can choose whichever suits your requirements.

    Default Mode

    Configuration

    Record Schema

    Each Pub/Sub message is transformed into a single Kafka record, structured as follows:

    • Kafka Key: A String of the Pub/Sub MessageID.

    • Kafka Value: The Pub/Sub message value as BYTES.

    • Kafka Headers: Includes the "PublishTimestamp" (in seconds) and all Pub/Sub message attributes mapped as separate headers.

    Key Schema

    The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.

    Value Schema

    The Kafka Value is mapped from the body of the Pub/Sub message.

    Headers Schema

    The Kafka Headers include:

    • PublishTimestamp: Long value representing the time when the Pub/Sub message was published, in seconds.

    • GCPProjectID: The GCP Project

    • PubSubTopicID: The Pub/Sub Topic ID.

    • PubSubSubscriptionID: The Pub/Sub Subscription ID.

    • All Pub/Sub message attributes: Each attribute from the Pub/Sub message is mapped as a separate header.

    Compatibility Mode

    Configuration

    Record Schema

    Each Pub/Sub message is transformed into a single Kafka record, structured as follows:

    • Kafka Key: Comprises the project ID, message ID, and subscription ID of the Pub/Sub message.

    • Kafka Value: Contains the message data and attributes from the Pub/Sub message.

    Key Schema

    The Key is a structure with these fields:

    Field Name
    Schema Type
    Description

    ProjectId

    String

    The Pub/Sub project containing the topic from which messages are polled.

    TopicId

    String

    The Pub/Sub topic containing the messages.

    SubscriptionId

    String

    The Pub/Sub subscription of the Pub/Sub topic.

    MessageId

    String

    Value Schema

    The Value is a structure with these fields:

    Field Name
    Schema Type
    Description

    MessageData

    Optional String

    The body of the Pub/Sub message.

    AttributeMap

    Optional String

    The attribute map associated with the Pub/Sub message.

    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value

    connect.pubsub.gcp.auth.mode

    Specifies the authentication mode for connecting to GCP.

    string

    Credentials, File or Default

    Default

    connect.pubsub.gcp.credentials

    For “auth.mode” credentials: GCP Authentication credentials string.

    string

    (Empty)

    You must provide the JMS implementation jars for your JMS service.

    Connector Class

    Example

    For more examples see the tutorials.

    KCQL support

    You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

    However, you can not route the same source to different topics, for this use a separate connector instance.

    The following KCQL is supported:

    The selection of fields from the JMS message is not supported.

    Examples:

    Destination types

    The connector supports both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.

    Message Converters

    The connector supports converters to handle different message payload formats in the source topic or queue.

    If no converter is provided the JMS message is converter to a Kafka Struct representation.

    See source record converters.

    Data type conversion

    Name
    Type

    message_timestamp

    Optional int64

    correlation_id

    Optional string

    redelivered

    Optional boolean

    reply_to

    Optional string

    destination

    Optional string

    message_id

    Optional string

    Option Reference

    Name
    Description
    Type
    Default Value

    connect.jms.url

    Provides the JMS broker url

    string

    connect.jms.initial.context.factory

    Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

    string

    connect.jms.connection.factory

    Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory

    string

    For more examples, see the tutorials.

    KCQL support

    You can specify multiple KCQL statements separated by ;

    However, you can not route the same source to different topics, for this use a separate connector instance.

    The following KCQL is supported:

    The selection of fields from the JMS message is not supported.

    Examples:

    Keyed JSON format

    To facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data, the connector supports WITHKEY in the KCQL syntax.

    Multiple key fields are supported using a delimiter:

    The resulting Kafka record key content will be the string concatenation of the values of the fields. Optionally, the delimiter can be set via the KEYDELIMITER keyword.

    Shared and Wildcard Subscriptions

    The connector supports wildcard and shared subscriptions, but the KCQL command must be placed inside single quotes.

    Dynamic target topics

    When using wildcard subscriptions, you can dynamically route messages to a Kafka topic with the same name as the MQTT topic by using `$`in the KCQL target statement.

    You must use a wildcard or a shared subscription format.

    Kafka does not support / . The result topic names will have/replaced by _. For example:

    /mqttSourceTopic/A/test would become mqttSourceTopic_A_test.

    Message converters

    The connector supports converters to handle different message payload formats in the source topic. See source record converters.

    Option Reference

    Name
    Description
    Type
    Default Value

    connect.mqtt.hosts

    Contains the MQTT connection end points.

    string

    connect.mqtt.username

    Contains the Mqtt connection user name

    string

    connect.mqtt.password

    Contains the Mqtt connection password

    password

    io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector
    name=GcpPubSubSourceDemo
    connector.class=io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector
    topics=kafka_topic_to_write_to
    tasks.max=1
    connect.pubsub.gcp.auth.mode=File
    connect.pubsub.gcp.file=/path/to/gcp-service-account-key.json
    connect.pubsub.gcp.project.id=gcp-project-id
    connect.pubsub.kcql=insert into `kafka_topic_to_write_to` select * from `gcp-subscription-id`
    INSERT INTO kafka-topic
    SELECT *
    FROM subscriptionId
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    INSERT INTO `my-topic-with-hyphen`
    SELECT *
    FROM bucketAddress:pathPrefix
    INSERT INTO my-topic SELECT * FROM subscriptionId;
    connect.pubsub.gcp.auth.mode=Default
    connect.pubsub.gcp.auth.mode=Credentials
    connect.pubsub.gcp.credentials=$GCP_CREDENTIALS
    connect.pubsub.gcp.project.id=$GCP_PROJECT_ID
    connect.pubsub.gcp.auth.mode=File
    connect.pubsub.gcp.file=/home/secure-stuff/gcp-read-credential.txt
    connect.pubsub.output.mode=DEFAULT
    connect.pubsub.output.mode=COMPATIBILITY
    io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
    name=jms-source
    connector.class=io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
    tasks.max=1
    connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE
    connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
    connect.jms.url=tcp://activemq:61616
    connect.jms.connection.factory=ConnectionFactory
    INSERT INTO kafka_topic
    SELECT *
    FROM jms_destination
    WITHTYPE [TOPIC|QUEUE]
    [WITHCONVERTER=`myclass`]
    -- Select from a JMS queue and write to a Kafka topic
    INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE
    
    -- Select from a JMS topic and write to a Kafka topic with a json converter
    INSERT INTO topicA
    SELECT * FROM jms_topic
    WITHTYPE TOPIC
    WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.AvroConverter`
    io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
    name=mqtt-source
    connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
    tasks.max=1
    connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`
    connect.mqtt.client.id=dm_source_id
    connect.mqtt.hosts=tcp://mqtt:1883
    connect.mqtt.service.quality=1
    INSERT INTO <your-kafka-topic>
    SELECT *
    FROM <your-mqtt-topic>
    [WITHCONVERTER=`myclass`]
    -- Insert mode, select all fields from topicA
    -- and write to topic topic with converter myclass
    INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]
    
    -- wildcard
    INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]
    // `[` enclosed by `]` denotes optional values
    WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']
    -- wildcard
    INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA/+/sensors WITHCONVERTER=`myclass`
    INSERT INTO `$` SELECT * FROM /mqttTopicA/+/sensors

    10000

    A unique ID for a Pub/Sub message

    connect.pubsub.gcp.file

    For “auth.mode” file: Local file path for file containing GCP authentication credentials.

    string

    (Empty)

    connect.pubsub.gcp.project.id

    GCP Project ID.

    string

    (Empty)

    connect.pubsub.kcql

    Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour

    string

    kcql configuration

    connect.pubsub.output.mode

    Output mode. Please see Output Modes documentation below.

    Default or Compatibility

    Default

    mode

    Optional int32

    type

    Optional string

    priority

    Optional int32

    bytes_payload

    Optional bytes

    properties

    Map of string

    ConnectionFactory

    connect.jms.kcql

    connect.jms.kcql

    string

    connect.jms.subscription.name

    subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics

    string

    connect.jms.password

    Provides the password for the JMS connection

    password

    connect.jms.username

    Provides the user for the JMS connection

    string

    connect.jms.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.jms.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.jms.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.jms.destination.selector

    Selector to use for destination lookup. Either CDI or JNDI.

    string

    CDI

    connect.jms.initial.context.extra.params

    List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp

    list

    []

    connect.jms.batch.size

    The number of records to poll for on the target JMS destination in each Connect poll.

    int

    100

    connect.jms.polling.timeout

    Provides the timeout to poll incoming messages

    long

    1000

    connect.jms.source.default.converter

    Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter

    string

    connect.jms.converter.throw.on.error

    If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

    boolean

    false

    connect.converter.avro.schemas

    If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE

    string

    connect.jms.headers

    Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"

    string

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.jms.evict.interval.minutes

    Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.

    int

    10

    connect.jms.evict.threshold.minutes

    The number of minutes after which an uncommitted entry becomes evictable from the connector cache.

    int

    10

    connect.jms.scale.type

    How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max

    connect.mqtt.service.quality

    Specifies the Mqtt quality of service

    int

    connect.mqtt.timeout

    Provides the time interval to establish the mqtt connection

    int

    3000

    connect.mqtt.clean

    connect.mqtt.clean

    boolean

    true

    connect.mqtt.keep.alive

    The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.

    int

    5000

    connect.mqtt.client.id

    Contains the Mqtt session client id

    string

    connect.mqtt.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.mqtt.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.mqtt.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.mqtt.retained.messages

    Specifies the Mqtt retained flag.

    boolean

    false

    connect.mqtt.converter.throw.on.error

    If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

    boolean

    false

    connect.converter.avro.schemas

    If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter

    string

    connect.mqtt.log.message

    Logs received MQTT messages

    boolean

    false

    connect.mqtt.kcql

    Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics

    string

    connect.mqtt.polling.timeout

    Provides the timeout to poll incoming messages

    int

    1000

    connect.mqtt.share.replicate

    Replicate the shared subscriptions to all tasks instead of distributing them

    boolean

    false

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.mqtt.ssl.ca.cert

    Provides the path to the CA certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.cert

    Provides the path to the certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.key

    Certificate private [config] key file path.

    string

    connect.mqtt.process.duplicates

    Process duplicate messages

    boolean

    false

    FTP

    This page describes the usage of the Stream Reactor FTP Source Connector.

    Provide the remote directories and on specified intervals, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting them into Kafka.

    Connector Class

    Example

    For more examples see the .

    Data types

    Each Kafka record represents a file and has the following types.

    • The format of the keys is configurable through connect.ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with the name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.

    • The values of the records contain the body of the file as bytes.

    Tailing Versus Update as a Whole

    The following rules are used.

    Tailed files are only allowed to grow. Bytes that have been appended to it since the last inspection are yielded. Preceding bytes are not allowed to change;

    Updated files can grow, shrink and change anywhere. The entire contents are yielded.

    Data converters

    Instead of dumping whole file bodies (and the danger of exceeding Kafka’s message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:

    The default object that is used is a pass-through converter, an instance of:

    To override it, create your own implementation of SourceRecordConverter and place the jar in the plugin.path.

    To learn more examples of using the FTP Kafka connector read this

    Option Reference

    Name
    Description
    Type
    Default Value
    io.lenses.streamreactor.connect.ftp.source.FtpSourceConnector

    connect.ftp.refresh

    iso8601 duration that the server is polled

    string

    connect.ftp.file.maxage

    iso8601 duration for how old files can be

    string

    connect.ftp.keystyle

    SourceRecord keystyle, string or struct

    string

    connect.ftp.protocol

    Protocol to use, FTP or FTPS

    string

    ftp

    connect.ftp.timeout

    Ftp connection timeout in milliseconds

    int

    30000

    connect.ftp.filter

    Regular expression to use when selecting files for processing

    string

    .*

    connect.ftp.monitor.tail

    Comma separated lists of path:destinationtopic; tail of file to tracked

    string

    connect.ftp.monitor.update

    Comma separated lists of path:destinationtopic; whole file is tracked

    string

    connect.ftp.monitor.slicesize

    File slice size in bytes

    int

    -1

    connect.ftp.fileconverter

    File converter class

    string

    io.lenses.streamreactor.connect.ftp.source.SimpleFileConverter

    connect.ftp.sourcerecordconverter

    Source record converter class

    string

    io.lenses.streamreactor.connect.ftp.source.NopSourceRecordConverter

    connect.ftp.max.poll.records

    Max number of records returned per poll

    int

    10000

    connect.ftp.address

    host[:port] of the ftp server

    string

    connect.ftp.user

    Username to connect with

    string

    connect.ftp.password

    Password to connect with

    string

    tutorials
    blog.

    name=ftp-source
    connector.class=io.lenses.streamreactor.connect.ftp.source.FtpSourceConnector
    tasks.max=1
    
    #server settings
    connect.ftp.address=localhost:21
    connect.ftp.user=ftp
    connect.ftp.password=ftp
    
    #refresh rate, every minute
    connect.ftp.refresh=PT1M
    
    #ignore files older than 14 days.
    connect.ftp.file.maxage=P14D
    
    #monitor /forecasts/weather/ and /logs/ for appends to files.
    #any updates go to the topics `weather` and `error-logs` respectively.
    connect.ftp.monitor.tail=/forecasts/weather/:weather,/logs/:error-logs
    
    #keep an eye on /statuses/, files are retrieved as a whole and sent to topic `status`
    connect.ftp.monitor.update=/statuses/:status
    
    #keystyle controls the format of the key and can be string or struct.
    #string only provides the file name
    #struct provides a structure with the filename and offset
    connect.ftp.keystyle=struct
    package io.lenses.streamreactor.connect.ftp
    
    trait SourceRecordConverter extends Configurable {
        def convert(in:SourceRecord) : java.util.List[SourceRecord]
    }
    
    class NopSourceRecordConverter extends SourceRecordConverter{
        override def configure(props: util.Map[String, _]): Unit = {}
        override def convert(in: SourceRecord): util.List[SourceRecord] = Seq(in).asJava
    }
    connect.ftp.sourcerecordconverter=your.name.space.YourConverter

    AWS S3

    This page describes the usage of the Stream Reactor AWS S3 Source Connector.

    This connector is also available on the AWS Marketplace.

    Objects that have been archived to AWS Glacier storage class are skipped, in order to load these objects you must manually restore the objects. Skipped objects are logged in the Connect workers log files.

    Connector Class

    Example

    For more examples see the .

    KCQL Support

    You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

    The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

    The connector does not support multiple KCQL statements that reference the same source location; to use multiple statements, configure each one in a separate connector instance.

    Source Bucket & Path

    The S3 source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.

    The FROM clause format is:

    If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.

    connect.s3.source.partition.extractor.regex=none

    connect.s3.source.ordering.type=LastModified

    To load in alpha numeric order set the ordering type to AlphaNumeric

    Target Bucket & Path

    The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:

    S3 Object formats

    The connector supports a range of storage formats, each with its own distinct functionality:

    • JSON: The connector will read objects containing JSON content, each line representing a distinct record.

    • Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.

    • Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.

    • Text: The connector will read objects containing lines of text, each line representing a distinct record.

    Use the STOREAS clause to configure the storage format. The following options are available:

    Text Processing

    When using Text storage, the connector provides additional configuration options to finely control how text content is processed.

    Regex

    In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:

    Start-End line

    In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:

    To trim the start and end lines, set the read.text.trim property to true:

    Start-End tag

    In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:

    Storage output matrix

    Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

    S3 Storage Format
    Kafka Output Format
    Restore or replicate cluster
    Analytics
    Sink Converter
    Source Converter

    Projections

    Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.

    Ordering

    The S3 sink employs zero-padding in object names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of object.

    When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical object key name order.

    In such cases, to process object in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.

    If using LastModified sorting, ensure objects do not arrive late, or use a post-processing step to handle them.

    Throttling

    To limit the number of object keys the source reads from S3 in a single poll. The default value, if not specified, is 1000:

    To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.

    Object Extension Filtering

    The AWS S3 Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes and connect.s3.source.extension.includes.

    Excluding Object Extensions

    The connect.s3.source.extension.excludes property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt and .csv objects, you would set this property as follows:

    Including Object Extensions

    The connect.s3.source.extension.includes property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json and .xml objects, you would set this property as follows:

    Note: If both connect.s3.source.extension.excludes and connect.s3.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.

    Post-Processing Options

    Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving files to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.

    Use Cases for Post-Processing Options

    1. Deleting Objects After Processing

      For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.

      Example:

      Result: Objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.

    2. Moving Objects to an Archive Bucket

      To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.

    Properties

    The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

    Name
    Description
    Type
    Available Values

    Authentication

    The connector offers two distinct authentication modes:

    • Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.

    • Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.

    When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

    Here’s an example configuration for the “Credentials” mode:

    For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.

    API Compatible systems

    The connector can also be used against API compatible systems provided they implement the following:

    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value

    Cassandra

    This page describes the usage of the Stream Reactor Cassandra Source Connector.

    Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.

    Connector Class

    Example

    For more examples see the .

    KCQL support

    You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

    However, you can not route the same source to different topics, for this use a separate connector instance.

    The following KCQL is supported:

    Examples:

    Keyed JSON Format

    The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:

    In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

    Multiple key fields are supported using a delimiter:

    The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.

    Keying is only supported in conjunction with the WITHFORMAT JSON clause

    Incremental mode

    This mode tracks new records added to a table. The columns to track are identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:

    1. TIMESTAMP

    2. TIMEUUID

    3. TOKEN

    4. DSESEARCHTIMESTAMP

    If set to TOKEN this column value is wrapped inside Cassandra's token function which needs unwrapping with the WITHUNWRAP command.

    You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from the Kafka topic. This is not recommended due to the creation of hotspots in Cassandra.

    DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.

    Bulk Mode

    The connector constantly loads the entire table.

    Controlling throughput

    The connector can be configured to:

    • Start from a particular offset - connect.cassandra.initial.offset

    • Increase or decrease the poll interval - connect.cassandra.import.poll.interval

    • Set a slice duration to query for in milliseconds - connect.cassandra.slice.duration

    For a more detailed explanation of how to use options.

    Source Data Type Mapping

    The following CQL data types are supported:

    CQL Type
    Connect Data Type

    Option Reference

    Name
    Description
    Type
    Default Value

    \

    io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
    io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector

    Date

    Optional String

    Tuple

    Optional String

    UDT

    Optional String

    Boolean

    Optional Boolean

    TinyInt

    Optional Int8

    SmallInt

    Optional Int16

    Int

    Optional Int32

    Decimal

    Optional String

    Float

    Optional Float32

    Counter

    Optional Int64

    BigInt

    Optional Int64

    VarInt

    Optional Int64

    Double

    Optional Int64

    Time

    Optional Int64

    Blob

    Optional Bytes

    Map

    Optional [String -> MAP]

    List

    Optional [String -> ARRAY]

    Set

    Optional [String -> ARRAY]

    connect.cassandra.username

    Username to connect to Cassandra with.

    string

    connect.cassandra.password

    Password for the username to connect to Cassandra with.

    password

    connect.cassandra.ssl.enabled

    Secure Cassandra driver connection via SSL.

    boolean

    false

    connect.cassandra.trust.store.path

    Path to the client Trust Store.

    string

    connect.cassandra.trust.store.password

    Password for the client Trust Store.

    password

    connect.cassandra.trust.store.type

    Type of the Trust Store, defaults to JKS

    string

    JKS

    connect.cassandra.key.store.type

    Type of the Key Store, defauts to JKS

    string

    JKS

    connect.cassandra.ssl.client.cert.auth

    Enable client certification authentication by Cassandra. Requires KeyStore options to be set.

    boolean

    false

    connect.cassandra.key.store.path

    Path to the client Key Store.

    string

    connect.cassandra.key.store.password

    Password for the client Key Store

    password

    connect.cassandra.consistency.level

    Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.

    string

    connect.cassandra.fetch.size

    The number of records the Cassandra driver will return at once.

    int

    5000

    connect.cassandra.load.balancing.policy

    Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN

    string

    TOKEN_AWARE

    connect.cassandra.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

    string

    THROW

    connect.cassandra.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.cassandra.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.cassandra.task.buffer.size

    The size of the queue as read writes to.

    int

    10000

    connect.cassandra.assigned.tables

    The tables a task has been assigned.

    string

    connect.cassandra.batch.size

    The number of records the source task should drain from the reader queue.

    int

    100

    connect.cassandra.import.poll.interval

    The polling interval between queries against tables for bulk mode.

    long

    1000

    connect.cassandra.time.slice.ms

    The range of time in milliseconds the source task the timestamp/timeuuid will use for query

    long

    10000

    connect.cassandra.import.allow.filtering

    Enable ALLOW FILTERING in incremental selects.

    boolean

    true

    connect.cassandra.slice.duration

    Duration to query for in target Cassandra table. Used to restrict query timestamp span

    long

    10000

    connect.cassandra.slice.delay.ms

    The delay between the current time and the time range of the query. Used to insure all of the data in the time slice is available

    long

    30000

    connect.cassandra.initial.offset

    The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’). Default 1900-01-01 00:00:00.0000000Z

    string

    1900-01-01 00:00:00.0000000Z

    connect.cassandra.mapping.collection.to.json

    Mapping columns with type Map, List and Set like json

    boolean

    true

    connect.cassandra.kcql

    KCQL expression describing field selection and routes.

    string

    TimeUUID

    Optional String

    UUID

    Optional String

    Inet

    Optional String

    Ascii

    Optional String

    Text

    Optional String

    Timestamp

    Optional String

    connect.cassandra.contact.points

    Initial contact point host for Cassandra including port.

    string

    localhost

    connect.cassandra.port

    Cassandra native port.

    int

    9042

    connect.cassandra.key.space

    Keyspace to write to.

    string

    tutorials
    Cassandra to Kafka

    name=cassandra
    connector.class=io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector
    connect.cassandra.key.space=demo
    connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
    connect.cassandra.contact.points=cassandra
    INSERT INTO <your-topic>
    SELECT FIELD,...
    FROM <your-cassandra-table>
    [PK FIELD]
    [WITHFORMAT JSON]
    [INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN|DSESEARCHTIMESTAMP]
    [WITHKEY(<your-key-field>)]
    -- Select all columns from table orders and insert into a topic
    -- called orders-topic, use column created to track new rows.
    -- Incremental mode set to TIMEUUID
    INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
    
    -- Select created, product, price from table orders and insert
    -- into a topic called orders-topic, use column created to track new rows.
    INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    INSERT INTO <topic>
    SELECT <fields>
    FROM <column_family>
    PK <PK_field>
    WITHFORMAT JSON
    WITHUNWRAP INCREMENTALMODE=<mode>
    WITHKEY(<key_field>)
    // `[` enclosed by `]` denotes optional values
    WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']
    INSERT INTO <topic>
    SELECT a, b, c, d
    FROM keyspace.table
    WHERE solr_query= 'pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]}'
    INCREMENTALMODE=DSESEARCHTIMESTAMP
    .
  • CSV: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.

  • Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.

  • Yes

    StringConverter

    StringConverter

    AVRO,Parquet

    STRING

    Same,Other

    No

    ByteArrayConverter

    ByteArrayConverter

    JSON

    JSON

    Same,Other

    Yes

    JsonConverter

    StringConverter

    JSON

    JSON

    Same,Other

    No

    StringConverter

    StringConverter

    AVRO,Parquet

    JSON

    Same,Other

    Yes,No

    JsonConverter

    JsonConverter or Avro Converter( Glue, Confluent)

    AVRO,Parquet, JSON

    BYTES

    Same,Other

    Yes,No

    ByteArrayConverter

    ByteArrayConverter

    AVRO,Parquet

    AVRO

    Same

    Yes

    Avro Converter( Glue, Confluent)

    Avro Converter( Glue, Confluent)

    AVRO,Parquet

    AVRO

    Same

    No

    ByteArrayConverter

    ByteArrayConverter

    AVRO,Parquet

    AVRO

    Other

    Yes,No

    Avro Converter( Glue, Confluent)

    Avro Converter( Glue, Confluent)

    AVRO,Parquet

    Protobuf

    Same

    Yes

    Protobuf Converter( Glue, Confluent)

    Protobuf Converter( Glue, Confluent)

    AVRO,Parquet

    Protobuf

    Same

    No

    ByteArrayConverter

    ByteArrayConverter

    AVRO,Parquet

    Protobuf

    Other

    Yes,No

    Protobuf Converter( Glue, Confluent)

    Protobuf Converter( Glue, Confluent)

    AVRO,Parquet, JSON

    Other

    Same, Other

    Yes,No

    ByteArrayConverter

    ByteArrayConverter

    Example:

    Result: Objects are transferred to an archive-bucket, stored with an updated path that includes the processed/ prefix, maintaining an organized archive structure.

    read.text.end.tag

    End Tag for Text Reading (if applicable)

    String

    read.text.buffer.size

    Text Buffer Size (for optimization)

    Int

    read.text.start.line

    Start Line for Text Reading (if applicable)

    String

    read.text.end.line

    End Line for Text Reading (if applicable)

    String

    read.text.trim

    Trim Text During Reading

    Boolean

    store.envelope

    Messages are stored as “Envelope”

    Boolean

    post.process.action

    Defines the action to perform on source objects after successful processing.

    Enum

    DELETE or MOVE

    post.process.action.bucket

    Specifies the target bucket for the MOVE action (required for MOVE).

    String

    post.process.action.prefix

    Specifies a new prefix for the object’s location when using the MOVE action (required for MOVE).

    String

    post.process.action.retain.dirs

    Ensure that paths are retained after a post-process action, using a zero-byte object to represent the path.

    Boolean

    false

    Secret Key for AWS S3 Credentials

    string

    connect.s3.aws.region

    AWS Region for S3 Bucket

    string

    connect.s3.pool.max.connections

    Maximum Connections in the Connection Pool

    int

    -1 (undefined)

    50

    connect.s3.custom.endpoint

    Custom Endpoint URL for S3 (if applicable)

    string

    connect.s3.kcql

    Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour

    string

    connect.s3.vhost.bucket

    Enable Virtual Hosted-style Buckets for S3

    boolean

    true, false

    false

    connect.s3.source.extension.excludes

    A comma-separated list of object extensions to exclude from the source object search.

    string

    [Object extension filtering]({{< relref "#object-extension-filtering" >}})

    connect.s3.source.extension.includes

    A comma-separated list of object extensions to include in the source object search.

    string

    [object extension filtering]({{< relref "#object-extension-filtering" >}})

    connect.s3.source.partition.extractor.type

    Type of Partition Extractor (Hierarchical or Regex)

    string

    hierarchical, regex

    connect.s3.source.partition.extractor.regex

    Regex Pattern for Partition Extraction (if applicable)

    string

    connect.s3.ordering.type

    Type of ordering for the S3 object keys to ensure the processing order.

    string

    AlphaNumeric, LastModified

    AlphaNumeric

    connect.s3.source.partition.search.continuous

    If set to true the connector will continuously search for new partitions.

    boolean

    true, false

    true

    connect.s3.source.partition.search.excludes

    A comma-separated list of paths to exclude from the partition search.

    string

    ".indexes"

    connect.s3.source.partition.search.interval

    The interval in milliseconds between searching for new partitions.

    long

    300000

    connect.s3.source.partition.search.recurse.levels

    Controls how many levels deep to recurse when searching for new partitions

    int

    0

    connect.s3.source.empty.results.backoff.initial.delay

    Initial delay before retrying when no results are found.

    long

    1000 Milliseconds

    connect.s3.source.empty.results.backoff.max.delay

    Maximum delay before retrying when no results are found.

    long

    10000 Milliseconds

    connect.s3.source.empty.results.backoff.multiplier

    Multiplier to apply to the delay when retrying when no results are found.

    double

    2.0 Multiplier (x)

    connect.s3.source.write.watermark.header

    Write the record with kafka headers including details of the source and line number of the file.

    boolean

    true, false

    false

    JSON

    STRING

    Same,Other

    Yes, No

    StringConverter

    StringConverter

    AVRO,Parquet

    STRING

    read.text.mode

    Controls how Text content is read

    Enum

    Regex, StartEndTag, StartEndLine

    read.text.regex

    Regular Expression for Text Reading (if applicable)

    String

    read.text.start.tag

    Start Tag for Text Reading (if applicable)

    String

    connect.s3.aws.auth.mode

    Specifies the AWS authentication mode for connecting to S3.

    string

    "Credentials," "Default"

    "Default"

    connect.s3.aws.access.key

    Access Key for AWS S3 Credentials

    string

    tutorials

    Same,Other

    connect.s3.aws.secret.key

    name=aws-s3SourceConnectorParquet
    connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
    tasks.max=1
    connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
    connect.s3.aws.region=eu-west-2
    connect.s3.aws.secret.key=SECRET_KEY
    connect.s3.aws.access.key=ACCESS_KEY
    connect.s3.aws.auth.mode=Credentials
    INSERT INTO $kafka-topic
    SELECT *
    FROM bucketAddress:pathPrefix
    [BATCH=batch]
    [STOREAS storage_format]
    [LIMIT limit]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    INSERT INTO `my-topic-with-hyphen`
    SELECT *
    FROM bucketAddress:pathPrefix
    FROM [bucketname]:pathprefix
    //my-bucket-called-pears:my-folder-called-apples 
    INSERT INTO my-apples-topic SELECT * FROM  my-bucket-called-pears:my-folder-called-apples 
    STOREAS `JSON`
    STOREAS `Avro`
    STOREAS `Parquet`
    STOREAS `Text`
    STOREAS `CSV`
    STOREAS `CSV_WithHeaders`
    STOREAS `Bytes`
    connect.s3.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')
    connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')
    connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')
    connect.s3.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')
    BATCH = 100
    LIMIT 10000
    connect.s3.source.extension.excludes=txt,csv
    connect.s3.source.extension.includes=json,xml
    INSERT INTO `my-topic`
    SELECT * FROM `my-s3-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`DELETE`
    )
    connect.s3.aws.auth.mode=Credentials
    connect.s3.aws.region=eu-west-2
    connect.s3.aws.access.key=$AWS_ACCESS_KEY
    connect.s3.aws.secret.key=$AWS_SECRET_KEY
    listObjectsV2
    listObjectsV2Pagbinator
    putObject
    getObject
    headObject
    deleteObjects
    deleteObject
    INSERT INTO `my-topic`
    SELECT * FROM `my-s3-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`MOVE`,
        'post.process.action.bucket'=`archive-bucket`,
        'post.process.action.prefix'=`processed/`
    )
    kcql configuration
    AWS Marketplace: Lenses Stream Reactor: S3 Source & Sink Kafka Connectorsaws.amazon.com
    Logo

    GCP Storage

    This page describes the usage of the Stream Reactor GCP Storage Source Connector.

    Connector Class

    Example

    For more examples see the .

    KCQL Support

    You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics.

    However, you can not route the same source to different topics, for this use a separate connector instance.

    The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

    Source Bucket & Path

    The GCP Storage source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.

    The FROM clause format is:

    If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.

    connect.gcpstorage.source.partition.extractor.regex=none

    connect.gcpstorage.source.ordering.type=LastModified

    To load in alpha numeric order set the ordering type to AlphaNumeric

    Target Bucket & Path

    The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:

    GCP Storage object formats

    The connector supports a range of storage formats, each with its own distinct functionality:

    • JSON: The connector will read objects containing JSON content, each line representing a distinct record.

    • Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.

    • Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.

    • Text: The connector will read objects containing lines of text, each line representing a distinct record.

    Use the STOREAS clause to configure the storage format. The following options are available:

    Text Processing

    When using Text storage, the connector provides additional configuration options to finely control how text content is processed.

    Regex

    In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:

    Start-End line

    In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:

    To trim the start and end lines, set the read.text.trim property to true:

    Start-End tag

    In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:

    Storage output matrix

    Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

    S3 Storage Format
    Kafka Output Format
    Restore or replicate cluster
    Analytics
    Sink Converter
    Source Converter

    Projections

    Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.

    Ordering

    [^3]s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of objects.

    When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical object name order.

    In such cases, to process objects in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.gcpstorage.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.

    If using LastModified sorting, ensure objects do not arrive late, or use a post-processing step to handle them.

    Throttling

    To limit the number of object names the source reads from GCS in a single poll. The default value, if not specified, is 1000:

    To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.

    Object Extension Filtering

    The GCP Storage Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes.

    Excluding Object Extensions

    The connect.gcpstorage.source.extension.excludes property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt and .csv objects, you would set this property as follows:

    Including Object Extensions

    The connect.gcpstorage.source.extension.includes property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json and .xml objects, you would set this property as follows:

    Note: If both connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.

    Post-Processing Options

    Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving objects to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.

    Use Cases for Post-Processing Options

    1. Deleting objects After Processing

      For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.

      Example:

      Result: objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.

    2. Moving objects to an Archive Bucket

      To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.

    Properties

    The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

    Name
    Description
    Type
    Available Values

    Authentication

    The connector offers two distinct authentication modes:

    • Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.

    • File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.

    • Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

    The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.

    When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .

    Here’s an example configuration for the “Credentials” mode:

    And here is an example configuration using the “File” mode:

    Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

    For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.

    Backup and Restore

    When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:

    When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.

    Partition Extraction

    When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.

    To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type property, which supports two options:

    • hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.

    • regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:

    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value
    io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
    .
  • CSV: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.

  • Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.

  • Yes

    StringConverter

    StringConverter

    AVRO,Parquet

    STRING

    Same,Other

    No

    ByteArrayConverter

    ByteArrayConverter

    JSON

    JSON

    Same,Other

    Yes

    JsonConverter

    StringConverter

    JSON

    JSON

    Same,Other

    No

    StringConverter

    StringConverter

    AVRO,Parquet

    JSON

    Same,Other

    Yes,No

    JsonConverter

    JsonConverter or Avro Converter( Glue, Confluent)

    AVRO,Parquet, JSON

    BYTES

    Same,Other

    Yes,No

    ByteArrayConverter

    ByteArrayConverter

    AVRO,Parquet

    AVRO

    Same

    Yes

    Avro Converter( Glue, Confluent)

    Avro Converter( Glue, Confluent)

    AVRO,Parquet

    AVRO

    Same

    No

    ByteArrayConverter

    ByteArrayConverter

    AVRO,Parquet

    AVRO

    Other

    Yes,No

    Avro Converter( Glue, Confluent)

    Avro Converter( Glue, Confluent)

    AVRO,Parquet

    Protobuf

    Same

    Yes

    Protobuf Converter( Glue, Confluent)

    Protobuf Converter( Glue, Confluent)

    AVRO,Parquet

    Protobuf

    Same

    No

    ByteArrayConverter

    ByteArrayConverter

    AVRO,Parquet

    Protobuf

    Other

    Yes,No

    Protobuf Converter( Glue, Confluent)

    Protobuf Converter( Glue, Confluent)

    AVRO,Parquet, JSON

    Other

    Same, Other

    Yes,No

    ByteArrayConverter

    ByteArrayConverter

    Example:

    Result: objects are transferred to an archive-bucket, stored with an updated path that includes the processed/ prefix, maintaining an organized archive structure.

    read.text.end.tag

    End Tag for Text Reading (if applicable)

    String

    read.text.buffer.size

    Text Buffer Size (for optimization)

    Int

    read.text.start.line

    Start Line for Text Reading (if applicable)

    String

    read.text.end.line

    End Line for Text Reading (if applicable)

    String

    read.text.trim

    Trim Text During Reading

    Boolean

    store.envelope

    Messages are stored as “Envelope”

    Boolean

    post.process.action

    Defines the action to perform on source objects after successful processing.

    Enum

    DELETE or MOVE

    post.process.action.bucket

    Specifies the target bucket for the MOVE action (required for MOVE).

    String

    post.process.action.prefix

    Specifies a new prefix for the object’s location when using the MOVE action (required for MOVE).

    String

    For "auth.mode" file: Local file path for file containing GCP authentication credentials.

    string

    (Empty)

    connect.gcpstorage.gcp.project.id

    GCP Project ID.

    string

    (Empty)

    connect.gcpstorage.gcp.quota.project.id

    GCP Quota Project ID.

    string

    (Empty)

    connect.gcpstorage.endpoint

    Endpoint for GCP Storage.

    string

    connect.gcpstorage.error.policy

    Defines the error handling policy when errors occur during data transfer to or from GCP Storage.

    string

    "NOOP," "THROW," "RETRY"

    "THROW"

    connect.gcpstorage.max.retries

    Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.

    int

    20

    connect.gcpstorage.retry.interval

    Specifies the interval (in milliseconds) between retry attempts by the connector.

    int

    60000

    connect.gcpstorage.http.max.retries

    Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.

    long

    5

    connect.gcpstorage.http.retry.interval

    Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.

    long

    50

    connect.gcpstorage.kcql

    Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour

    string

    [kcql configuration]({{< relref "#kcql-support" >}})

    connect.gcpstorage.source.extension.excludes

    A comma-separated list of object extensions to exclude from the source object search.

    string

    [object extension filtering]({{< relref "#object-extension-filtering" >}})

    connect.gcpstorage.source.extension.includes

    A comma-separated list of object extensions to include in the source object search.

    string

    [object extension filtering]({{< relref "#object-extension-filtering" >}})

    connect.gcpstorage.source.partition.extractor.type

    Type of Partition Extractor (Hierarchical or Regex)

    string

    hierarchical, regex

    connect.gcpstorage.source.partition.extractor.regex

    Regex Pattern for Partition Extraction (if applicable)

    string

    connect.gcpstorage.source.partition.search.continuous

    If set to true the connector will continuously search for new partitions.

    boolean

    true, false

    true

    connect.gcpstorage.source.partition.search.interval

    The interval in milliseconds between searching for new partitions.

    long

    300000

    connect.gcpstorage.source.partition.search.excludes

    A comma-separated list of paths to exclude from the partition search.

    string

    ".indexes"

    connect.gcpstorage.source.partition.search.recurse.levels

    Controls how many levels deep to recurse when searching for new partitions

    int

    0

    connect.gcpstorage.ordering,type

    Type of ordering for the GCS object keys to ensure the processing order.

    string

    AlphaNumeric, LastModified

    AlphaNumeric

    connect.gcpstorage.source.empty.results.backoff.initial.delay

    Initial delay before retrying when no results are found.

    long

    1000 Milliseconds

    connect.gcpstorage.source.empty.results.backoff.max.delay

    Maximum delay before retrying when no results are found.

    long

    10000 Milliseconds

    connect.gcpstorage.source.empty.results.backoff.multiplier

    Multiplier to apply to the delay when retrying when no results are found.

    double

    2.0 Multiplier (x)

    connect.gcpstorage.source.write.watermark.header

    Write the record with kafka headers including details of the source and line number of the file.

    boolean

    true, false

    false

    JSON

    STRING

    Same,Other

    Yes, No

    StringConverter

    StringConverter

    AVRO,Parquet

    STRING

    read.text.mode

    Controls how Text content is read

    Enum

    Regex, StartEndTag, StartEndLine

    read.text.regex

    Regular Expression for Text Reading (if applicable)

    String

    read.text.start.tag

    Start Tag for Text Reading (if applicable)

    String

    connect.gcpstorage.gcp.auth.mode

    Specifies the authentication mode for connecting to GCP.

    string

    "Credentials", "File" or "Default"

    "Default"

    connect.gcpstorage.gcp.credentials

    For "auth.mode" credentials: GCP Authentication credentials string.

    string

    (Empty)

    tutorials
    The sink employs zero-padding in object name
    here

    Same,Other

    connect.gcpstorage.gcp.file

    name=gcp-storageSourceConnectorParquet # this can be anything
    connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
    tasks.max=1
    connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
    connect.gcpstorage.gcp.auth.mode=Credentials
    connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
    connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
    INSERT INTO $kafka-topic
    SELECT *
    FROM bucketAddress:pathPrefix
    [BATCH=batch]
    [STOREAS storage_format]
    [LIMIT limit]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    INSERT INTO `my-topic-with-hyphen`
    SELECT *
    FROM bucketAddress:pathPrefix
    FROM [bucketname]:pathprefix
    //my-bucket-called-pears:my-folder-called-apples 
    INSERT INTO my-apples-topic SELECT * FROM  my-bucket-called-pears:my-folder-called-apples 
    STOREAS `JSON`
    STOREAS `Avro`
    STOREAS `Parquet`
    STOREAS `Text`
    STOREAS `CSV`
    STOREAS `CSV_WithHeaders`
    STOREAS `Bytes`
    connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')
    connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')
    connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')
     connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')
    BATCH = 100
    LIMIT 10000
    connect.gcpstorage.source.extension.excludes=txt,csv
    connect.gcpstorage.source.extension.includes=json,xml
    INSERT INTO `my-topic`
    SELECT * FROM `my-gcp-storage-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`DELETE`
    )
    connect.gcpstorage.gcp.auth.mode=Default
    connect.gcpstorage.gcp.auth.mode=Credentials
    connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
    connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
    connect.gcpstorage.gcp.auth.mode=File
    connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-read-credential.txt
    {
      "key": <the message Key, which can be a primitive or a complex object>,
      "value": <the message Value, which can be a primitive or a complex object>,
      "headers": {
        "header1": "value1",
        "header2": "value2"
      },
      "metadata": {
        "offset": 0,
        "partition": 0,
        "timestamp": 0,
        "topic": "topic"
      }
    }
    (?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$
    INSERT INTO `my-topic`
    SELECT * FROM `my-gcp-storage-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`MOVE`,
        'post.process.action.bucket'=`archive-bucket`,
        'post.process.action.prefix'=`processed/`
    )