arrow-left

All pages
gitbookPowered by GitBook
1 of 17

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Azure Event Hubs

This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.

Coming soon!

Sinks

This page details the configuration options for the Stream Reactor Kafka Connect sink connectors.

Sink connectors read data from Kafka and write to an external system.


hashtag
FAQ

hashtag
Can the datalakes sinks lose data?

Kafka topic retention policies determine how long a message is retained in a topic before it is deleted. If the retention period expires and the connector has not processed the messages, possibly due to not running or other issues, the unprocessed Kafka data will be deleted as per the retention policy. This can lead to significant data loss since the messages will no longer be available for the connector to sink to the target system.

hashtag
Do the datalake sinks support exactly once semantics?

Yes, the datalakes connectors natively support exactly-once guarantees.

hashtag
How do I escape dots in field names in KCQL?

Field names in Kafka message headers or values may contain dots (.). To access these correctly, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

hashtag
How do I escape other special characters in field names in KCQL?

For field names with spaces or special characters, use a similar escaping strategy:

  • Field name with a space: `_value.'full name'`

  • Field name with special characters: `_value.'$special_characters!'`

This ensures the connector correctly extracts the intended fields and avoids parsing errors.

AWS S3

Sink data from Kafka to AWS S3 including backing up topics and offsets.

Azure CosmosDB

Sink data from Kafka to Azure CosmosDB.

Azure Event Hubs

Load data from Azure Event Hubs into Kafka topics.

Azure Service Bus

Sink data from Kafka to Azure Service Bus topics and queues.

Cassandra

Sink data from Kafka to Cassandra.

Elasticsearch

Sink data from Kafka to Elasticsearch.

GCP Storage

Sink data from Kafka to GCP Storage.

HTTP Sink

Sink data from Kafka to a HTTP endpoint.

InfluxDB

Sink data from Kafka to InfluxDB.

JMS

Sink data from Kafka to JMS.

MongoDB

Sink data from Kafka to MongoDB.

MQTT

Sink data from Kafka to MQTT.

Redis

Sink data from Kafka to Redis.

Cover
Cover
Cover
Cover
Cover
Cover
Cover
Cover
Cover
Cover
Cover
Cover
Cover
INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA

Azure CosmosDB

This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.

circle-exclamation

Version 10.0.0 introduces breaking changes: the connector has been renamed from DocumentDB, uses the official CosmosDB SDK, and supports new bulk and key strategies.

circle-check

A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.

hashtag
Connector Class

hashtag
Example

circle-check

For more examples see the .

hashtag
KCQL support

circle-check

You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

Examples:

hashtag
Insert Mode

Insert is the default write mode of the sink. It inserts messages from Kafka topics into CosmosDB.

hashtag
Upsert Mode

The Sink supports CosmosDB upsert functionality which replaces the existing row if a match is found on the primary keys.

This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.

If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.

hashtag
Bulk Mode

Bulk mode enables efficient batching of writes to CosmosDB, reducing API calls and improving throughput. Enable it with:

When enabled, you can control batching behavior using the KCQL PROPERTIES clause (flush.size, flush.count, flush.interval). If disabled, records are written individually.

hashtag
Custom Key Strategy

You can control how the connector populates the id field in CosmosDB documents using:

  • connect.cosmosdb.key.source (Key, Metadata, KeyPath, ValuePath)

  • connect.cosmosdb.key.path (field path, used with KeyPath or ValuePath)

This allows you to use the Kafka record key, metadata, or a specific field from the key or value as the document ID.

hashtag
Error Handling

Configure error handling and retry behavior with:

  • connect.cosmosdb.error.policy (NOOP, THROW, RETRY)

  • connect.cosmosdb.max.retries (max retry attempts)

  • connect.cosmosdb.retry.interval (milliseconds between retries)

These settings control how the connector responds to errors during writes.

hashtag
Throughput

You can set the manual throughput (RU/s) for new CosmosDB collections with:

The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.

hashtag
Proxy

If you need to connect via a proxy, specify the proxy details with:

hashtag
Progress Reporting

Enable progress reporting to log how many records have been processed:

hashtag
Option Reference

Name
Description
Type
Default Value

hashtag
KCQL Properties

Note: The following KCQL PROPERTIES options are only available when connect.cosmosdb.bulk.enabled=true. If bulk mode is disabled, records are written individually.

The CosmosDB Sink Connector supports the following KCQL PROPERTIES options to control file flushing behavior:

Property
Description
Type
Default Value

Flush Options Explained:

  • Flush by Count: Triggers a flush after a specified number of records have been written.

  • Flush by Size: Initiates a flush once a predetermined size (in bytes) has been reached.

  • Flush by Interval: Enforces a flush after a defined time interval (in seconds), acting as a fail-safe to ensure timely data management even if other flush conditions are not met.

You can use these properties in your KCQL statement's PROPERTIES clause, for example:

hashtag
Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

hashtag
Error policies

The connector supports .

hashtag
Endpoint and Master Key Configuration

To connect to your Azure CosmosDB instance, the connector requires two essential configuration properties:

  • connect.cosmosdb.endpoint: This specifies the URI of your CosmosDB account. It should point to the connection endpoint provided in the Azure CosmosDB dashboard.

  • connect.cosmosdb.master.key: This is the authentication key used to access the database. Note: Azure often refers to this as the primary key, which can be confusing—the master.key in the connector configuration corresponds to Azure's primary key value.

Both properties are mandatory and must be set to establish a connection with CosmosDB.

hashtag
Key Population Strategy

The connector offers flexible options for populating the id field of documents in CosmosDB. The behavior is controlled by two configurations:

connect.cosmosdb.key.source

  • Description: Defines the strategy used to extract or generate the document ID.

  • Valid Values:

    • Key (default): Use the Kafka record key.

connect.cosmosdb.key.path

  • Description: Specifies the field path to extract the key from, when using KeyPath or ValuePath.

  • Default: id

  • Example: If using ValuePath with connect.cosmosdb.key.path=id, the connector will use value.id

Azure Service Bus

This page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.

Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Bus in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.

hashtag
Connector Class

hashtag
Full Config Example

circle-check

For more examples see the .

The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in . Feel free to tweak the configuration to your requirements.

hashtag
KCQL support

circle-info

You can specify multiple KCQL statements separated by ; to have the connector map between multiple topics.

The following KCQL is supported:

It allows you to map Kafka topic of name <your-kafka-topic> to Service Bus of name <your-service-bus> using the PROPERTIES specified (please check for more info on necessary properties)

The selection of fields from the Service Bus message is not supported.

hashtag
Authentication

You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.

Learn more about different methods of connecting to Service Bus on the .

hashtag
QUEUE and TOPIC Mappings

The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check to learn more about those mechanisms.

hashtag
Writing to QUEUE ServiceBus

In order to be writing to the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE here and we need to pass it.

This is sufficient to enable you to create the mapping with your queue.

hashtag
Writing to TOPIC ServiceBus

In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part:

  1. Parameter servicebus.type which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC here and we need to pass it.

This is sufficient to enable you to create the mapping with your topic.

hashtag
Disabling batching

If the Connector is supposed to transfer big messages (size of one megabyte and more), Service Bus may not want to accept a batch of such payloads, failing the Connector Task. In order to remediate that you may want to use batch.enabled parameter, setting it to false. This will sacrifice the ability to send the messages in batch (possibly doing it slower) but should enable user to transfer them safely.

For most of the usages, we recommend omitting it (it's set to true by default).

hashtag
Kafka payload support

This sink supports the following Kafka payloads:

  • String Schema Key and Binary payload (then MessageId in Service Bus is set with Kafka Key)

  • any other key (or keyless) and Binary payload (this causes Service Bus messages to not have specified MessageId)

  • No Schema and JSON

hashtag
Null Payload Transfer

circle-exclamation

Azure Service Bus doesn't allow to send messages with null content (payload)

Null Payload (sometimes referred as Kafka Tombstone) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren't allowed to send messages with null payload and we have to drop them instead.

Please keep that in mind when using Service Bus and designing business logic around null payloads!

hashtag
Option Reference

hashtag
KCQL Properties

Please find below all the necessary KCQL properties:

Name
Description
Type
Default Value

hashtag
Configuration parameters

Please find below all the relevant configuration parameters:

Name
Description
Type
Default Value
io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector

connect.cosmosdb.error.threshold (number of errors tolerated before failing)

connect.cosmosdb.proxy

Specifies the connection proxy details.

string

Database & Throughput

connect.cosmosdb.db

The Azure CosmosDB target database.

string

connect.cosmosdb.db.create

If set to true it will create the database if it doesn't exist. If this is set to default(false) an exception will be raised.

boolean

false

connect.cosmosdb.collection.throughput

The manual throughput to provision for new Cosmos DB collections (RU/s). The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.

int

400

Key & Partitioning

connect.cosmosdb.key.source

The source of the key. There are 4 possible values: Key, Metadata, KeyPath or ValuePath.

string

Key

connect.cosmosdb.key.path

When used with key.source configurations of KeyPath or ValuePath, this is the path to the field in the object that will be used as the key. Defaults to 'id'.

string

id

Write & Consistency

connect.cosmosdb.consistency.level

Determines the write visibility. There are four possible values: Strong, BoundedStaleness, Session or Eventual.

string

Session

connect.cosmosdb.bulk.enabled

Enable bulk mode to reduce chatter.

boolean

false

connect.cosmosdb.kcql

KCQL expression describing field selection and data routing to the target CosmosDb.

string

Error Handling & Retries

connect.cosmosdb.error.policy

Specifies the action to be taken if an error occurs while inserting the data. Options: NOOP (swallow error), THROW (propagate error), RETRY (retry message). The number of retries is based on the error.

string

THROW

connect.cosmosdb.max.retries

The maximum number of times to try the write again.

int

20

connect.cosmosdb.retry.interval

The time in milliseconds between retries.

int

60000

connect.cosmosdb.error.threshold

The number of errors to tolerate before failing the sink.

int

5

Queue & Performance

connect.cosmosdb.flush.count.enable

Flush on count can be disabled by setting this property to 'false'.

boolean

true

connect.cosmosdb.upload.sync.period

The time in milliseconds to wait before sending the request.

int

100

connect.cosmosdb.executor.threads

The number of threads to use for processing the records.

int

1

connect.cosmosdb.max.queue.size

The maximum number of records to queue per topic before blocking. If the queue limit is reached the connector will throw RetriableException and the connector settings to handle retries will be used.

int

1000000

connect.cosmosdb.max.queue.offer.timeout.ms

The maximum time in milliseconds to wait for the queue to accept a record. If the queue does not accept the record within this time, the connector will throw RetriableException and the connector settings to handle retries will be used.

int

120000

Monitoring & Progress

connect.progress.enabled

Enables the output for how many records have been processed.

boolean

false

Metadata: Use Kafka metadata (topic/partition/offset).
  • KeyPath: Extract from a field within the Kafka key.

  • ValuePath: Extract from a field within the Kafka value.

  • Display Name: Key strategy

  • as the document ID.

    Authentication & Connection

    connect.cosmosdb.endpoint

    The Azure CosmosDB end point.

    string

    connect.cosmosdb.master.key

    The connection master key.

    password

    flush.size

    Specifies the size (in bytes) for the flush operation.

    Long

    (connector default)

    flush.count

    Specifies the number of records for the flush operation.

    Int

    (connector default)

    flush.interval

    Specifies the interval (in seconds) for the flush operation.

    Long

    tutorials
    Error policies

    (connector default)

    connect.servicebus.sink.retries.timeout

    Timeout (in milliseconds) between retries if message has failed to be delivered to Service Bus

    int

    500

    servicebus.type

    Specifies Service Bus type: QUEUE or TOPIC

    string

    batch.enabled

    Specifies if the Connector can send messages in batch, see Azure Service Bus

    boolean

    true

    connect.servicebus.connection.string

    Specifies the Connection String to connect to Service Bus

    string

    connect.servicebus.kcql

    Comma-separated output KCQL queries

    string

    connect.servicebus.sink.retries.max

    Number of retries if message has failed to be delivered to Service Bus

    int

    tutorials
    Azure Websitearrow-up-right
    Azure Service Bus documentationarrow-up-right
    Option Reference
    QUEUE and TOPIC Mappings

    3

    io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
    name=cosmosdb
    connector.class=io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
    tasks.max=1
    topics=orders-string
    connect.cosmosdb.kcql=INSERT INTO orders SELECT * FROM orders-string
    connect.cosmosdb.db=dm
    connect.cosmosdb.endpoint=[YOUR_AZURE_ENDPOINT]
    connect.cosmosdb.db.create=true
    connect.cosmosdb.master.key=[YOUR_MASTER_KEY]
    connect.cosmosdb.batch.size=10
    INSERT | UPSERT
    INTO <your-collection>
    SELECT FIELD, ...
    FROM kafka_topic
    [PK FIELDS,...]
    -- Insert mode, select all fields from topicA
    -- and write to tableA
    INSERT INTO collectionA SELECT * FROM topicA
    
    -- UPSERT mode, select 3 fields and
    -- rename from topicB and write to tableB
    -- with primary key as the field id from the topic
    UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id
    connect.cosmosdb.bulk.enabled=true
    connect.cosmosdb.collection.throughput=400
    connect.cosmosdb.proxy=<proxy-uri>
    connect.progress.enabled=true
    INSERT INTO myCollection SELECT * FROM myTopic PROPERTIES('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
    connector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
    name=AzureEventHubsSinkConnector
    tasks.max=1
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
    connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');
    INSERT INTO <your-service-bus>
    SELECT *
    FROM <your-kafka-topic>
    PROPERTIES(...); 
    connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=
    connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');
    connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');

    JMS

    This page describes the usage of the Stream Reactor JMS Sink Connector.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    JMS Topics and Queues

    The sink can write to either topics or queues, specified by the WITHTYPE clause.

    hashtag
    JMS Payload

    When a message is sent to a JMS target it can be one of the following:

    • JSON - Send a TextMessage

    • AVRO - Send a BytesMessage

    • MAP - Send a MapMessage

    • OBJECT - Send an ObjectMessage

    This is set by the WITHFORMAT keyword.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports .

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    Deprecated Cassandra

    This page describes the usage of the Stream Reactor Cassandra Sink Connector part of the kafka-connect-cassandra-*** artifact.

    circle-exclamation

    The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.

    circle-exclamation

    io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector

    connect.jms.kcql

    connect.jms.kcql

    string

    connect.jms.subscription.name

    subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics

    string

    connect.jms.password

    Provides the password for the JMS connection

    password

    connect.jms.username

    Provides the user for the JMS connection

    string

    connect.jms.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.jms.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.jms.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.jms.destination.selector

    Selector to use for destination lookup. Either CDI or JNDI.

    string

    CDI

    connect.jms.initial.context.extra.params

    List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp

    list

    []

    connect.jms.batch.size

    The number of records to poll for on the target JMS destination in each Connect poll.

    int

    100

    connect.jms.polling.timeout

    Provides the timeout to poll incoming messages

    long

    1000

    connect.jms.source.default.converter

    Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter

    string

    connect.jms.converter.throw.on.error

    If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

    boolean

    false

    connect.converter.avro.schemas

    If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE

    string

    connect.jms.headers

    Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"

    string

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.jms.evict.interval.minutes

    Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.

    int

    10

    connect.jms.evict.threshold.minutes

    The number of minutes after which an uncommitted entry becomes evictable from the connector cache.

    int

    10

    connect.jms.scale.type

    How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max

    connect.jms.url

    Provides the JMS broker url

    string

    connect.jms.initial.context.factory

    Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

    string

    connect.jms.connection.factory

    Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory

    string

    tutorials
    Error policies

    ConnectionFactory

    name=jms
    connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
    tasks.max=1
    topics=orders
    connect.jms.url=tcp://activemq:61616
    connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
    connect.jms.connection.factory=ConnectionFactory
    connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON
    INSERT INTO <jms-destination>
    SELECT FIELD, ...
    FROM <your-kafka-topic>
    [WITHFORMAT AVRO|JSON|MAP|OBJECT]
    WITHTYPE TOPIC|QUEUE
    -- Select all fields from topicA and write to jmsA queue
    INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE
    
    -- Select 3 fields and rename from topicB and write
    -- to jmsB topic as JSON in a TextMessage
    INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC
    As of Stream-Reactor version 10, the sink will be deprecated. No other changes or fixes will be provided. Follow the Cassandra link to see the new sink implementation details.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Deletion in Cassandra

    Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.

    Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:

    If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:

    Deletion will only occur if a message with an empty payload is received from Kafka.

    circle-info

    Ensure your ordinal position of the connect.cassandra.delete.struct_flds matches the binding order in the Cassandra delete statement!

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    connect.cassandra.contact.points

    Initial contact point host for Cassandra including port.

    string

    localhost

    connect.cassandra.port

    Cassandra native port.

    int

    9042

    connect.cassandra.key.space

    Keyspace to write to.

    string

    Redis

    This page describes the usage of the Stream Reactor Redis Sink Connector.

    circle-info

    hashtag
    This connector has been retired starting version 11.0.0

    io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
    name=cassandra-sink
    connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
    tasks.max=1
    topics=orders
    connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
    connect.cassandra.port=9042
    connect.cassandra.key.space=demo
    connect.cassandra.contact.points=cassandra
    INSERT INTO <your-cassandra-table>
    SELECT FIELD,...
    FROM <your-table>
    [TTL=Time to live]
    -- Insert mode, select all fields from topicA and
    -- write to tableA
    INSERT INTO tableA SELECT * FROM topicA
    
    -- Insert mode, select 3 fields and rename from topicB
    -- and write to tableB
    INSERT INTO tableB SELECT x AS a, y, c FROM topicB
    
    -- Insert mode, select 3 fields and rename from topicB
    -- and write to tableB with TTL
    INSERT INTO tableB SELECT x, y FROM topicB TTL=100000
    DELETE FROM orders WHERE id = ? and product = ?
    # Message
    # "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
    # DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"
    
    # connect.cassandra.delete.enabled=true
    # connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
    # connect.cassandra.delete.struct_flds=id,product

    connect.cassandra.username

    Username to connect to Cassandra with.

    string

    connect.cassandra.password

    Password for the username to connect to Cassandra with.

    password

    connect.cassandra.ssl.enabled

    Secure Cassandra driver connection via SSL.

    boolean

    false

    connect.cassandra.trust.store.path

    Path to the client Trust Store.

    string

    connect.cassandra.trust.store.password

    Password for the client Trust Store.

    password

    connect.cassandra.trust.store.type

    Type of the Trust Store, defaults to JKS

    string

    JKS

    connect.cassandra.key.store.type

    Type of the Key Store, defauts to JKS

    string

    JKS

    connect.cassandra.ssl.client.cert.auth

    Enable client certification authentication by Cassandra. Requires KeyStore options to be set.

    boolean

    false

    connect.cassandra.key.store.path

    Path to the client Key Store.

    string

    connect.cassandra.key.store.password

    Password for the client Key Store

    password

    connect.cassandra.consistency.level

    Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.

    string

    connect.cassandra.fetch.size

    The number of records the Cassandra driver will return at once.

    int

    5000

    connect.cassandra.load.balancing.policy

    Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN

    string

    TOKEN_AWARE

    connect.cassandra.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

    string

    THROW

    connect.cassandra.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.cassandra.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.cassandra.threadpool.size

    The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.

    int

    0

    connect.cassandra.delete.struct_flds

    Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.

    list

    []

    connect.cassandra.delete.statement

    Delete statement for cassandra

    string

    connect.cassandra.kcql

    KCQL expression describing field selection and routes.

    string

    connect.cassandra.default.value

    By default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved.

    string

    connect.cassandra.delete.enabled

    Enables row deletion from cassandra

    boolean

    false

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    hashtag
    Cache mode

    The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:

    You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:

    This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:

    Composite keys are supported with the PK clause, a delimiter can be set with the optional configuration property connect.redis.pk.delimiter.

    hashtag
    Sorted Sets

    To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:

    This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.

    The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.

    hashtag
    Multiple Sorted Sets

    The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store in the sorted-sets. Set KCQL clause to define the filed using PK (primary key)

    circle-info

    Notice we have dropped the INSERT clause.

    The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:

    This will create a key with names FX-USDGBP , FX-EURGBP etc.

    The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.

    hashtag
    Geospatial add

    To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:

    hashtag
    Streams

    To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:

    hashtag
    PubSub

    To insert a message from a Kafka topic to a Redis PubSub use the following KCQL syntax:

    The channel to write to in Redis is determined by a field in the payload of the Kafka message set in the KCQL statement, in this case, a field called myfield.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    connect.redis.pk.delimiter

    Specifies the redis primary key delimiter

    string

    .

    ssl.provider

    The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

    string

    ssl.protocol

    The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.

    string

    AWS S3

    This page describes the usage of the Stream Reactor AWS S3 Sink Connector.

    This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to AWS S3 Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    This example writes to a bucket called demo, partitioning by a field called ts, store as JSON.

    hashtag
    KCQL Support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The connector uses KCQL to map topics to S3 buckets and paths. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .:

    In this case, you can use the following KCQL statement:

    hashtag
    Target Bucket and Path

    The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

    Here are a few examples:

    hashtag
    SQL Projection

    Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

    hashtag
    Source Topic

    To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:

    In this case the topic name will be appended to the $target destination.

    hashtag
    Partitioning & Object Keys

    The object key serves as the filename used to store data in S3. There are two options for configuring the object key:

    • Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $bucket/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.

    • Custom: The object key is driven by the PARTITIONBY clause. The format is either $bucket/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (AWS Athena naming style mimicking Hive-like data partitioning) or $bucket/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.

    Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of AWS S3 key length restrictions.

    circle-exclamation

    The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.

    The partition clause works on header, key and values fields of the Kafka message.

    To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:

    However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

    You can also use the entire message key as long as it can be coerced into a primitive type:

    In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the S3 object key name:

    Kafka message headers can also be used in the S3 object key definition, provided the header values are of primitive types easily convertible to strings:

    Customizing the object key can leverage various components of the Kafka message. For example:

    This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure S3 object keys effectively.

    To enable Athena-like partitioning, use the following syntax:

    hashtag
    Rolling Windows

    Storing data in Amazon S3 and partitioning it by time is a common practice in data management. For instance, you may want to organize your S3 data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

    To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process.

    Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:

    In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

    The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

    hashtag
    Data Storage Format

    While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within AWS S3. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.

    Supported storage formats encompass:

    • AVRO

    • Parquet

    • JSON

    • CSV (including headers)

    Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in S3. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.

    By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

    circle-exclamation

    Not supported with a custom partition strategy.

    Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in S3 is desired.

    hashtag
    Examples

    Storing the message Value Avro data as Parquet in S3:

    The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in S3.

    Enabling the full message stored as JSON in S3:

    Enabling the full message stored as AVRO in S3:

    If the restore (see the S3 Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

    hashtag
    Flush Options

    The connector offers three distinct flush options for data management:

    • Flush by Count - triggers a file flush after a specified number of records have been written to it.

    • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

    • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

    It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

    Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

    The flush options are configured using the flush.count, flush.size, and flush.interval properties. The settings are optional and if not specified the defaults are:

    • flush.count = 50_000

    • flush.size = 500000000 (500MB)

    • flush.interval = 3_600 (1 hour)

    circle-check

    A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.

    When connect.s3.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:

    hashtag
    Flushing By Interval

    The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to S3). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.

    hashtag
    Properties

    The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

    Name
    Description
    Type
    Available Values
    Default Value

    The sink connector optimizes performance by padding the output objects. This proves beneficial when using the S3 Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the S3 Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.

    hashtag
    Compression

    AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.

    Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:

    Compression
    Avro Support
    Avro (requires Level)
    Parquet Support
    JSON

    Please note that not all compression libraries are bundled with the S3 connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

    hashtag
    Authentication

    The connector offers two distinct authentication modes:

    • Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.

    • Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.

    Here’s an example configuration for the Credentials mode:

    For enhanced security and flexibility when using the Credentials mode, it is highly advisable to utilize Connect Secret Providers.

    hashtag
    Error policies

    The connector supports .

    hashtag
    API Compatible systems

    The connector can also be used against API compatible systems provided they implement the following:

    hashtag
    Indexes Directory

    The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.

    By default, the index objects are grouped within a prefix named .indexes for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this .indexes prefix.

    You can configure the prefix for these index objects using the property connect.s3.indexes.name. This property specifies the path from the root of the S3 bucket. Note that even if you configure this property, the connector will still place the indexes within a nested prefix of the specified prefix.

    hashtag
    Examples

    hashtag
    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value

    InfluxDB

    This page describes the usage of the Stream Reactor InfluxDB Sink Connector.

    circle-info

    hashtag
    This connector has been retired starting version 11.0.0

    io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
    name=redis
    connector.class=io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
    tasks.max=1
    topics=redis
    connect.redis.host=redis
    connect.redis.port=6379
    connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM
    [INSERT INTO <redis-cache>]
    SELECT FIELD, ...
    FROM <kafka-topic>
    [PK FIELD]
    [STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]
    { "symbol": "USDGBP" , "price": 0.7943 }
    { "symbol": "EURGBP" , "price": 0.8597 }
    SELECT price from yahoo-fx PK symbol
    Key=EURGBP  Value={ "price": 0.7943 }
    INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60
    SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)
    INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60
    INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADD
    INSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAM
    SELECT * FROM topic STOREAS PubSub (channel=myfield)
    io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector

    TLS

    ssl.truststore.location

    The location of the trust store file.

    string

    ssl.keystore.password

    The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

    password

    ssl.keystore.location

    The location of the key store file. This is optional for client and can be used for two-way authentication for client.

    string

    ssl.truststore.password

    The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

    password

    ssl.keymanager.algorithm

    The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    string

    SunX509

    ssl.trustmanager.algorithm

    The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

    string

    PKIX

    ssl.keystore.type

    The file format of the key store file. This is optional for client.

    string

    JKS

    ssl.cipher.suites

    A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.

    list

    ssl.endpoint.identification.algorithm

    The endpoint identification algorithm to validate server hostname using server certificate.

    string

    https

    ssl.truststore.type

    The file format of the trust store file.

    string

    JKS

    ssl.enabled.protocols

    The list of protocols enabled for SSL connections.

    list

    [TLSv1.2, TLSv1.1, TLSv1]

    ssl.key.password

    The password of the private key in the key store file. This is optional for client.

    password

    ssl.secure.random.implementation

    The SecureRandom PRNG implementation to use for SSL cryptography operations.

    string

    connect.redis.kcql

    KCQL expression describing field selection and routes.

    string

    connect.redis.host

    Specifies the redis server

    string

    connect.redis.port

    Specifies the redis connection port

    int

    connect.redis.password

    Provides the password for the redis connection.

    password

    connect.redis.ssl.enabled

    Enables ssl for the redis connection

    boolean

    false

    connect.redis.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.redis.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.redis.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    Text

  • BYTES

  • Sets the padding length for the partition.

    Int

    0

    padding.length.offset

    Sets the padding length for the offset.

    Int

    12

    partition.include.keys

    Specifies whether partition keys are included.

    Boolean

    false Default (Custom Partitioning): true

    store.envelope

    Indicates whether to store the entire Kafka message

    Boolean

    store.envelope.fields.key

    Indicates whether to store the envelope’s key.

    Boolean

    store.envelope.fields.headers

    Indicates whether to store the envelope’s headers.

    Boolean

    store.envelope.fields.value

    Indicates whether to store the envelope’s value.

    Boolean

    store.envelope.fields.metadata

    Indicates whether to store the envelope’s metadata.

    Boolean

    flush.size

    Specifies the size (in bytes) for the flush operation.

    Long

    500000000 (500MB)

    flush.count

    Specifies the number of records for the flush operation.

    Int

    50000

    flush.interval

    Specifies the interval (in seconds) for the flush operation

    Long

    3600(1h)

    key.suffix

    When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added

    String

    <empty>

    GZIP

    ✅

    ✅

    LZ0

    ✅

    LZ4

    ✅

    BROTLI

    ✅

    BZIP2

    ✅

    ZSTD

    ✅

    ⚙️

    ✅

    DEFLATE

    ✅

    ⚙️

    XZ

    ✅

    ⚙️

    The AWS Secret Key used for authentication.

    string

    (Empty)

    connect.s3.aws.region

    The AWS Region where the S3 bucket is located.

    string

    (Empty)

    connect.s3.pool.max.connections

    Specifies the maximum number of connections allowed in the AWS Client’s HTTP connection pool when interacting with S3.

    int

    -1 (undefined)

    50

    connect.s3.custom.endpoint

    Allows for the specification of a custom S3 endpoint URL if needed.

    string

    (Empty)

    [Deprecated]

    connect.s3.vhost.bucket

    Enables the use of Vhost Buckets for S3 connections. Always set to true when custom endpoints are used. Deprecation: This setting maps directly to the AWS SDK’s pathStyleAccessEnabled() method. However, these two concepts are semantically opposite. Use available since version 11.3.0. connect.s3.path.style.access

    boolean

    true, false

    false

    connect.s3.path.style.access

    When set to

    true it enables path-style access (matches AWS SDK semantics). When set to

    false it enables virtual-hosted style access.

    boolena

    true, false

    connect.s3.error.policy

    connect.s3.path.style.accessDefines the error handling policy when errors occur during data transfer to or from S3.

    string

    “NOOP,” “THROW,” “RETRY”

    “THROW”

    connect.s3.max.retries

    Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.

    int

    20

    connect.s3.retry.interval

    Specifies the interval (in milliseconds) between retry attempts by the connector.

    int

    60000

    connect.s3.http.max.retries

    Sets the maximum number of retries for the underlying HTTP client when interacting with S3.

    long

    5

    connect.s3.http.retry.interval

    Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.

    long

    50

    connect.s3.local.tmp.directory

    Enables the use of a local folder as a staging area for data transfer operations.

    string

    (Empty)

    connect.s3.kcql

    A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.

    string

    (Empty)

    connect.s3.compression.codec

    Sets the Parquet compression codec to be used when writing data to S3.

    string

    “UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”

    “UNCOMPRESSED”

    connect.s3.compression.level

    Sets the compression level when compression is enabled for data transfer to S3.

    int

    1-9

    (Empty)

    connect.s3.seek.max.files

    Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.

    int

    5

    connect.s3.indexes.name

    Configure the indexes prefix for this connector.

    string

    ".indexes"

    connect.s3.exactly.once.enable

    By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management

    boolean

    true, false

    true

    connect.s3.schema.change.detector

    Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).

    string

    default, version, compatibility

    default

    connect.s3.skip.null.values

    Skip records with null values (a.k.a. tombstone records).

    boolean

    true, false

    false

    connect.s3.latest.schema.optimization.enabled

    When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.

    boolean

    true,false

    false

    padding.type

    Specifies the type of padding to be applied.

    LeftPad, RightPad, NoOp

    LeftPad, RightPad, NoOp

    LeftPad

    padding.char

    Defines the character used for padding.

    Char

    ‘0’

    UNCOMPRESSED

    ✅

    ✅

    ✅

    SNAPPY

    ✅

    ✅

    Index Name (connect.s3.indexes.name)

    Resulting Indexes Prefix Structure

    Description

    .indexes (default)

    .indexes/<connector_name>/

    The default setup, where each connector uses its own subdirectory within .indexes.

    custom-indexes

    custom-indexes/<connector_name>/

    Custom root directory custom-indexes, with a subdirectory for each connector.

    indexes/s3-connector-logs

    indexes/s3-connector-logs/<connector_name>/

    Uses a custom subdirectory s3-connector-logs within indexes, with a subdirectory for each connector.

    logs/indexes

    logs/indexes/<connector_name>/

    Indexes are stored under logs/indexes, with a subdirectory for each connector.

    connect.s3.aws.auth.mode

    Specifies the AWS authentication mode for connecting to S3.

    string

    “Credentials,” “Default”

    “Default”

    connect.s3.aws.access.key

    The AWS Access Key used for authentication.

    string

    (Empty)

    tutorials
    Error policies
    sink commit.png

    padding.length.partition

    connect.s3.aws.secret.key

    connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
    connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.ts STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
    topics=demo
    name=demo
    INSERT INTO bucketAddress[:pathPrefix]
    SELECT *
    FROM kafka-topic
    [[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
    [STOREAS storage_format]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    {
      ...
      "a.b": "value",
      ...
    }
    INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
    INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
    INSERT INTO testbucket SELECT * FROM topicA;
    INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
    topics.regex = ^sensor_data_\d+$
    connect.s3.kcql= INSERT INTO $target SELECT * FROM  `*` ....
    PARTITIONBY fieldA, fieldB
    PARTITIONBY _key
    PARTITIONBY _key.fieldA, _key.fieldB
    PARTITIONBY _header.<header_key1>[, _header.<header_key2>]
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    INSERT INTO $bucket[:$prefix]
    SELECT * FROM $topic
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    STOREAS `AVRO`
    PROPERTIES (
        'partition.include.keys'=true,
    )
    connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
    connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    {
      "key": <the message Key, which can be a primitive or a complex object>,
      "value": <the message Key, which can be a primitive or a complex object>,
      "headers": {
        "header1": "value1",
        "header2": "value2"
      },
      "metadata": {
        "offset": 0,
        "partition": 0,
        "timestamp": 0,
        "topic": "topic"
      }
    }
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    ...
    pgsqlCopyEditmessage1 -> schema1  
    message2 -> schema1  
      (No flush needed – same schema)
    
    message3 -> schema2  
      (Flush occurs – new schema introduced)
    
    message4 -> schema2  
      (No flush needed – same schema)
    
    message5 -> schema1  
      Without optimization: would trigger a flush  
      With optimization: no flush – schema1 is backward-compatible with schema2
    
    message6 -> schema2  
    message7 -> schema2  
      (No flush needed – same schema, it would happen based on the flush thresholds)
    ...
    connect.s3.aws.auth.mode=Credentials
    connect.s3.aws.region=eu-west-2
    connect.s3.aws.access.key=$AWS_ACCESS_KEY
    connect.s3.aws.secret.key=$AWS_SECRET_KEY
    ...
    listObjectsV2
    listObjectsV2Pagbinator
    putObject
    getObject
    headObject
    deleteObjects
    deleteObject
    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Tags

    InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.

    circle-info

    Only applicable to value fields. No support for nested fields, keys or topic metadata.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    connect.influx.url

    The InfluxDB database url.

    string

    connect.influx.db

    The database to store the values to.

    string

    connect.influx.username

    The user to connect to the influx database

    string

    Elasticsearch

    This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.

    hashtag
    Connector Class

    hashtag
    Elasticsearch 6

    io.lenses.streamreactor.connect.influx.InfluxSinkConnector
    name=influxdb
    connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
    tasks.max=1
    topics=influx
    connect.influx.url=http://influxdb:8086
    connect.influx.db=mydb
    connect.influx.username=admin
    connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
    INSERT INTO <your-measure>
    SELECT FIELD, ...
    FROM kafka_topic_name
    [WITHTIMESTAMP FIELD|sys_time]
    [WITHTAG(FIELD|(constant_key=constant_value)]
    -- Insert mode, select all fields from topicA and write to indexA
    INSERT INTO measureA SELECT * FROM topicA
    
    -- Insert mode, select 3 fields and rename from topicB and write to indexB,
    -- use field Y as the point measurement
    INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
    
    -- Insert mode, select 3 fields and rename from topicB and write to indexB,
    -- use field Y as the current system time for Point measurement
    INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
    
    -- Tagging using constants
    INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
    
    -- Tagging using fields in the payload. Say we have a Payment structure
    -- with these fields: amount, from, to, note
    INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
    
    -- Tagging using a combination of fields in the payload and constants.
    -- Say we have a Payment structure with these fields: amount, from, to, note
    INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)

    connect.influx.password

    The password for the influxdb user.

    password

    connect.influx.kcql

    KCQL expression describing field selection and target measurements.

    string

    connect.progress.enabled

    Enables the output for how many records have been processed by the connector

    boolean

    false

    connect.influx.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.influx.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.influx.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.influx.retention.policy

    Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen from 1.0 onwards or default for any previous version

    string

    autogen

    connect.influx.consistency.level

    Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.

    string

    ALL

    hashtag
    Elasticsearch 7

    hashtag
    Example

    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Kafka Tombstone Handling

    It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:

    • IGNORE (ignores tombstones entirely)

    • FAIL (throws Exception if tombstone happens)

    • DELETE (deletes index with specified id)

    Example:

    hashtag
    Primary Keys

    The PK keyword allows you to specify fields that will be used to generate the key value in Elasticsearch. The values of the selected fields are concatenated and separated by a hyphen (-).

    If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.

    Field Prefixes

    When defining fields, specific prefixes can be used to determine where the data should be extracted from:

    • _key Prefix Specifies that the value should be extracted from the message key.

      • If a path is provided after _key, it identifies the location within the key where the field value resides.

      • If no path is provided, the entire message key is used as the value.

    • _value Prefix Specifies that the value should be extracted from the message value.

      • The remainder of the path identifies the specific location within the message value to extract the field.

    • _header Prefix Specifies that the value should be extracted from the message header.

      • The remainder of the path indicates the name of the header to be used for the field value.

    hashtag
    Insert and Upsert modes

    INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.

    hashtag
    Document Type

    WITHDOCTYPE allows you to associate a document type to the document inserted.

    hashtag
    Index Suffix

    WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.

    Example:


    hashtag
    Index Names

    hashtag
    Static Index Names

    To use a static index name, define the target index in the KCQL statement without any prefixes:

    This will consistently create an index named index_name for any messages consumed from topicA.

    hashtag
    Extracting Index Names from Headers, Keys, and Values

    hashtag
    Headers

    To extract an index name from a message header, use the _header prefix followed by the header name:

    This statement extracts the value from the gate header field and uses it as the index name.

    For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

    In this case, the value of the header named prefix.abc.suffix is used to form the index name.

    hashtag
    Keys

    To use the full value of the message key as the index name, use the _key prefix:

    For example, if the message key is "freddie", the resulting index name will be freddie.

    hashtag
    Values

    To extract an index name from a field within the message value, use the _value prefix followed by the field name:

    This example uses the value of the name field from the message's value. If the field contains "jason", the index name will be jason.

    Nested Fields in Values

    To access nested fields within a value, specify the full path using dot notation:

    If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.

    Fields with Dots in Their Names

    For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

    If the value structure contains:

    The extracted index name will be hans.

    hashtag
    Auto Index Creation

    The Sink will automatically create missing indexes at startup.

    Please note that this feature is not compatible with index names extracted from message headers/keys/values.

    hashtag
    Options Reference

    Name
    Description
    Type
    Default Value

    connect.elastic.protocol

    URL protocol (http, https)

    string

    http

    connect.elastic.hosts

    List of hostnames for Elastic Search cluster node, not including protocol or port.

    string

    localhost

    connect.elastic.port

    Port on which Elastic Search node listens on

    string

    hashtag
    KCQL Properties

    Name
    Description
    Type
    Default Value

    behavior.on.null.values

    Specifies behavior on Kafka tombstones: IGNORE , DELETE or FAIL

    String

    IGNORE

    hashtag
    SSL Configuration Properties

    Property Name

    Description

    ssl.truststore.location

    Path to the truststore file containing the trusted CA certificates for verifying broker certificates.

    ssl.truststore.password

    Password for the truststore file to protect its integrity.

    ssl.truststore.type

    Type of the truststore (e.g., JKS, PKCS12). Default is JKS.

    ssl.keystore.location

    Path to the keystore file containing the client’s private key and certificate chain for client authentication.

    ssl.keystore.password

    Password for the keystore to protect the private key.

    ssl.keystore.type

    Type of the keystore (e.g., JKS, PKCS12). Default is JKS.

    hashtag
    SSL Configuration

    Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.

    While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:

    • Data encryption: Prevents unauthorized access to data being transferred.

    • Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.

    • Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).

    hashtag
    Configuration Example

    hashtag
    Terminology:

    • Truststore: Holds certificates to check if the node’s certificate is valid.

    • Keystore: Contains your client’s private key and certificate to prove your identity to the node.

    • SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.

    • Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.

    MQTT

    This page describes the usage of the Stream Reactor MQTT Sink Connector.

    hashtag
    Connector Class

    hashtag
    Example

    io.lenses.streamreactor.connect.elastic6.ElasticSinkConnector
    io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
    name=elastic
    connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
    tasks.max=1
    topics=orders
    connect.elastic.protocol=http
    connect.elastic.hosts=elastic
    connect.elastic.port=9200
    connect.elastic.cluster.name=elasticsearch
    connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
    connect.progress.enabled=true
    INSERT | UPSERT
    INTO <elastic_index >
    SELECT FIELD, ...
    FROM kafka_topic
    [PK FIELD,...]
    [WITHDOCTYPE=<your_document_type>]
    [WITHINDEXSUFFIX=<your_suffix>]
    -- Insert mode, select all fields from topicA and write to indexA
    INSERT INTO indexA SELECT * FROM topicA
    
    -- Insert mode, select 3 fields and rename from topicB
    -- and write to indexB
    INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
    
    -- UPSERT
    UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
    INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')
    WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
    INSERT INTO index_name SELECT * FROM topicA
    INSERT INTO _header.gate SELECT * FROM topicA
    INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA
    INSERT INTO _key SELECT * FROM topicA
    INSERT INTO _value.name SELECT * FROM topicA
    INSERT INTO _value.name.firstName SELECT * FROM topicA
    INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
    {
      "customer.name": {
        "first.name": "hans"
      }
    }
    ssl.truststore.location=/path/to/truststore.jks
    ssl.truststore.password=your_truststore_password
    ssl.truststore.type=JKS  # Can also be PKCS12 if applicable
    
    ssl.keystore.location=/path/to/keystore.jks
    ssl.keystore.password=your_keystore_password
    ssl.keystore.type=JKS  # Can also be PKCS12 if applicable
    
    ssl.protocol=TLSv1.2  # Or TLSv1.3 for stronger security
    
    ssl.trustmanager.algorithm=PKIX  # Default algorithm for managing certificates
    ssl.keymanager.algorithm=PKIX  # Default algorithm for managing certificates

    9300

    connect.elastic.tableprefix

    Table prefix (optional)

    string

    connect.elastic.cluster.name

    Name of the elastic search cluster, used in local mode for setting the connection

    string

    elasticsearch

    connect.elastic.write.timeout

    The time to wait in millis. Default is 5 minutes.

    int

    300000

    connect.elastic.batch.size

    How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once

    int

    4000

    connect.elastic.use.http.username

    Username if HTTP Basic Auth required default is null.

    string

    connect.elastic.use.http.password

    Password if HTTP Basic Auth required default is null.

    string

    connect.elastic.error.policy

    Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.elastic.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.elastic.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.elastic.kcql

    KCQL expression describing field selection and routes.

    string

    connect.elastic.pk.separator

    Separator used when have more that one field in PK

    string

    -

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    ssl.protocol

    The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.

    ssl.trustmanager.algorithm

    Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    ssl.keymanager.algorithm

    Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL Support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Dynamic targets

    The connector can route the messages to specific MQTT targets. These are the possible options:

    1. Given Constant Topic:

      Route messages to a specified MQTT topic using a constant value:

    2. Using a Field Value for Topic:

      Direct messages to an MQTT topic based on a specified field's value. Example expression: fieldA.fieldB.fieldC.

    3. Utilizing Message Key as Topic:

      Determine the MQTT topic by the incoming message key, expected to be a string.

    4. Leveraging Kafka Message Topic:

      Set the MQTT topic using the incoming Kafka message’s original topic.

    hashtag
    Kafka payload support

    The sink publishes each Kafka record to MQTT according to the type of the record’s value:

    Payload type
    Action taken by the sink

    Binary (byte[])

    Forwarded unchanged (“pass-through”).

    String

    Published as its UTF-8 byte sequence.

    Connect Struct produced by Avro, Protobuf, or JSON Schema converters

    Converted to JSON, then published as the JSON string’s UTF-8 bytes.

    java.util.Map or other Java Collection

    Serialized to JSON and published as the JSON string’s UTF-8 bytes.

    In short, non-binary objects are first turned into a JSON string; everything that reaches MQTT is ultimately a sequence of bytes.

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    connect.mqtt.hosts

    Contains the MQTT connection end points.

    string

    connect.mqtt.username

    Contains the Mqtt connection user name

    string

    connect.mqtt.password

    Contains the Mqtt connection password

    password

    hashtag
    Migration

    Version 9 introduces two breaking changes that affect how you route data and project fields in KCQL. Review the sections below and update your configurations before restarting the connector.

    WITHTARGET is no longer used

    Before (v < 9.0.0)

    After (v ≥ 9.0.0)

    INSERT INTO SELECT * FROM control.boxes.test WITHTARGET ${path}

    INSERT INTO ${path} SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')

    Migration step

    1. Delete every WITHTARGET … clause.

    2. Move the placeholder (or literal) that held the target path into the INSERT INTO expression.

    3. Add mqtt.target.from.field=true to the KCQL PROPERTIES list.

    hashtag
    KCQL field projections are ignored

    Before (v < 9.0.0)

    After (v ≥ 9.0.0)

    Handled directly in the KCQL SELECT list: SELECT id, temp AS temperature …

    The connector passes the full record. Any projection, renaming, or value transformation must be done with a Kafka Connect Single Message Transformer (SMT), KStreams, or another preprocessing step.

    Migration step

    • Remove field lists and aliases from KCQL.

    • Attach an SMT such as org.apache.kafka.connect.transforms.ExtractField$Value, org.apache.kafka.connect.transforms.MaskField$Value, or your own custom SMT to perform the same logic.

    Azure Data Lake Gen2

    This page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.

    This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to Azure Data Lake Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL Support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The connector uses KCQL to map topics to Datalake buckets and paths. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .:

    In this case, you can use the following KCQL statement:

    hashtag
    Target Bucket and Path

    The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

    Here are a few examples:

    hashtag
    SQL Projection

    Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

    hashtag
    Source Topic

    To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:

    In this case the topic name will be appended to the $target destination.

    hashtag
    KCQL Properties

    The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

    Name
    Description
    Type
    Available Values
    Default Value

    The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the Datalake Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the Datalake Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.

    hashtag
    Partitioning & File names

    The object key serves as the filename used to store data in Datalake. There are two options for configuring the object key:

    • Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.

    • Custom: The object key is driven by the PARTITIONBY clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.

    circle-exclamation

    The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.

    The partition clause works on Header, Key and Values fields of the Kafka message.

    Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of Azure Datalake key length restrictions.

    To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:

    However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

    You can also use the entire message key as long as it can be coerced into a primitive type:

    In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the Datalake object key name:

    Kafka message headers can also be used in the Datalake object key definition, provided the header values are of primitive types easily convertible to strings:

    Customizing the object key can leverage various components of the Kafka message. For example:

    This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure Datalake object keys effectively.

    To enable Athena-like partitioning, use the following syn

    hashtag
    Rolling Windows

    Storing data in Azure Datalake and partitioning it by time is a common practice in data management. For instance, you may want to organize your Datalake data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

    To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation .

    Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:

    In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

    The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

    hashtag
    Data Storage Format

    While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within Azure Datalake. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.

    Supported storage formats encompass:

    • AVRO

    • Parquet

    • JSON

    • CSV (including headers)

    Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in Datalake. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.

    By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

    circle-exclamation

    Not supported with a custom partition strategy.

    Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in Datalake is desired.

    hashtag
    Examples

    Storing the message Value Avro data as Parquet in Datalake:

    The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in Datalake.

    Enabling the full message stored as JSON in Datalake:

    Enabling the full message stored as AVRO in Datalake:

    If the restore (see the Datalake Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

    hashtag
    Flush Options

    The connector offers three distinct flush options for data management:

    • Flush by Count - triggers a file flush after a specified number of records have been written to it.

    • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

    • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

    It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

    Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

    The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see section). The settings are optional and if not specified the defaults are:

    • flush.count = 50_000

    • flush.size = 500000000 (500MB)

    • flush.interval = 3600 (1 hour)

    circle-check

    A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.

    When connect.datalake.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:

    hashtag
    Flushing By Interval

    The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to Data Lake). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.

    hashtag
    Compression

    AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.

    Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:

    Compression
    Avro Support
    Avro (requires Level)
    Parquet Support
    JSON

    Please note that not all compression libraries are bundled with the Datalake connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

    hashtag
    Authentication

    The connector offers two distinct authentication modes:

    • Default: This mode relies on the default Azure authentication chain, simplifying the authentication process.

    • Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with Azure.

    • Credentials: In this mode, explicit configuration of Azure Access Key and Secret Key is required for authentication.

    When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

    Here’s an example configuration for the “Credentials” mode:

    And here is an example configuration using the “Connection String” mode:

    For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers.

    hashtag
    Error policies

    The connector supports .

    hashtag
    Indexes Directory

    The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.

    By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes directory.

    You can configure the root directory for these index files using the property connect.datalake.indexes.name. This property specifies the path from the root of the data lake filesystem. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.

    hashtag
    Examples

    hashtag
    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value

    MongoDB

    This page describes the usage of the Stream Reactor MongoDB Sink Connector.

    hashtag
    Connector Class

    hashtag
    Example

    INSERT INTO lenses-io-demo ...
    INSERT INTO `<path to field>` 
    SELECT * FROM control.boxes.test 
    PROPERTIES('mqtt.target.from.field'='true')
    io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
    name=mqtt
    connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
    tasks.max=1
    topics=orders
    connect.mqtt.hosts=tcp://mqtt:1883
    connect.mqtt.clean=true
    connect.mqtt.timeout=1000
    connect.mqtt.keep.alive=1000
    connect.mqtt.service.quality=1
    connect.mqtt.client.id=dm_sink_id
    connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders
    INSERT
    INTO <mqtt-topic>
    SELECT * //no field projection supported
    FROM <kafka-topic>
    //no WHERE clause supported
    -- Insert into /landoop/demo all fields from kafka_topicA
    INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA
    
    -- Insert into /landoop/demo all fields from dynamic field
    INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')
    io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector

    connect.mqtt.service.quality

    Specifies the Mqtt quality of service

    int

    connect.mqtt.timeout

    Provides the time interval to establish the mqtt connection

    int

    3000

    connect.mqtt.clean

    connect.mqtt.clean

    boolean

    true

    connect.mqtt.keep.alive

    The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.

    int

    5000

    connect.mqtt.client.id

    Contains the Mqtt session client id

    string

    connect.mqtt.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.mqtt.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.mqtt.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.mqtt.retained.messages

    Specifies the Mqtt retained flag.

    boolean

    false

    connect.mqtt.converter.throw.on.error

    If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

    boolean

    false

    connect.converter.avro.schemas

    If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter

    string

    connect.mqtt.kcql

    Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics

    string

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.mqtt.ssl.ca.cert

    Provides the path to the CA certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.cert

    Provides the path to the certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.key

    Certificate private [config] key file path.

    string

    Sets the padding length for the partition.

    Int

    0

    padding.length.offset

    Sets the padding length for the offset.

    Int

    12

    partition.include.keys

    Specifies whether partition keys are included.

    Boolean

    false Default (Custom Partitioning): true

    store.envelope

    Indicates whether to store the entire Kafka message

    Boolean

    store.envelope.fields.key

    Indicates whether to store the envelope’s key.

    Boolean

    store.envelope.fields.headers

    Indicates whether to store the envelope’s headers.

    Boolean

    store.envelope.fields.value

    Indicates whether to store the envelope’s value.

    Boolean

    store.envelope.fields..metadata

    Indicates whether to store the envelope’s metadata.

    Boolean

    flush.size

    Specifies the size (in bytes) for the flush operation.

    Long

    500000000 (500MB)

    flush.count

    Specifies the number of records for the flush operation.

    Int

    50000

    flush.interval

    Specifies the interval (in seconds) for the flush operation.

    Long

    3600 (1 hour)

    key.suffix

    When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added

    String

    <empty>

    Text

  • BYTES

  • GZIP

    ✅

    ✅

    LZ0

    ✅

    LZ4

    ✅

    BROTLI

    ✅

    BZIP2

    ✅

    ZSTD

    ✅

    ⚙️

    ✅

    DEFLATE

    ✅

    ⚙️

    XZ

    ✅

    ⚙️

    The Azure Account Name used for authentication.

    string

    (Empty)

    connect.datalake.pool.max.connections

    Specifies the maximum number of connections allowed in the Azure Client’s HTTP connection pool when interacting with Datalake.

    int

    -1 (undefined)

    50

    connect.datalake.endpoint

    Datalake endpoint URL.

    string

    (Empty)

    connect.datalake.error.policy

    Defines the error handling policy when errors occur during data transfer to or from Datalake.

    string

    “NOOP,” “THROW,” “RETRY”

    “THROW”

    connect.datalake.max.retries

    Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.

    int

    20

    connect.datalake.retry.interval

    Specifies the interval (in milliseconds) between retry attempts by the connector.

    int

    60000

    connect.datalake.http.max.retries

    Sets the maximum number of retries for the underlying HTTP client when interacting with Datalake.

    long

    5

    connect.datalake.http.retry.interval

    Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.

    long

    50

    connect.datalake.local.tmp.directory

    Enables the use of a local folder as a staging area for data transfer operations.

    string

    (Empty)

    connect.datalake.kcql

    A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.

    string

    (Empty)

    connect.datalake.compression.codec

    Sets the Parquet compression codec to be used when writing data to Datalake.

    string

    “UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”

    “UNCOMPRESSED”

    connect.datalake.compression.level

    Sets the compression level when compression is enabled for data transfer to Datalake.

    int

    1-9

    (Empty)

    connect.datalake.seek.max.files

    Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.

    int

    5

    connect.datalake.indexes.name

    Configure the indexes root directory for this connector.

    string

    ".indexes"

    connect.datalake.exactly.once.enable

    By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management

    boolean

    true, false

    true

    connect.datalake.schema.change.detector

    Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).

    string

    default, version, compatibility

    default

    connect.datalake.skip.null.values

    Skip records with null values (a.k.a. tombstone records).

    boolean

    true, false

    false

    connect.datalake.latest.schema.optimization.enabled

    When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.

    boolean

    true,false

    false

    padding.type

    Specifies the type of padding to be applied.

    LeftPad, RightPad, NoOp

    LeftPad, RightPad, NoOp

    LeftPad

    padding.char

    Defines the character used for padding.

    Char

    ‘0’

    UNCOMPRESSED

    ✅

    ✅

    ✅

    SNAPPY

    ✅

    ✅

    Index Name (connect.datalake.indexes.name)

    Resulting Indexes Directory Structure

    Description

    .indexes (default)

    .indexes/<connector_name>/

    The default setup, where each connector uses its own subdirectory within .indexes.

    custom-indexes

    custom-indexes/<connector_name>/

    Custom root directory custom-indexes, with a subdirectory for each connector.

    indexes/datalake-connector-logs

    indexes/datalake-connector-logs/<connector_name>/

    Uses a custom subdirectory datalake-connector-logs within indexes, with a subdirectory for each connector.

    logs/indexes

    logs/indexes/<connector_name>/

    Indexes are stored under logs/indexes, with a subdirectory for each connector.

    connect.datalake.azure.auth.mode

    Specifies the Azure authentication mode for connecting to Datalake.

    string

    “Credentials”, “ConnectionString” or “Default”

    “Default”

    connect.datalake.azure.account.key

    The Azure Account Key used for authentication.

    string

    (Empty)

    tutorials
    here
    Error policies
    sink commit.png
    KCQL Properties

    padding.length.partition

    connect.datalake.azure.account.name

    INSERT INTO `_key` 
    SELECT ...
    INSERT INTO `_topic` 
    SELECT ...
    connector.class=io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
    connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    INSERT INTO bucketAddress[:pathPrefix]
    SELECT *
    FROM kafka-topic
    [[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
    [STOREAS storage_format]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    {
      ...
      "a.b": "value",
      ...
    }
    INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
    INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
    INSERT INTO testcontainer SELECT * FROM topicA;
    INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
    topics.regex = ^sensor_data_\d+$
    connect.datalake.kcql= INSERT INTO $target SELECT * FROM  `*` ....
    PARTITIONBY fieldA, fieldB
    PARTITIONBY _key
    PARTITIONBY _key.fieldA, _key.fieldB
    PARTITIONBY _header.<header_key1>[, _header.<header_key2>]
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    INSERT INTO $container[:$prefix]
    SELECT * FROM $topic
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    STOREAS `AVRO`
    PROPERTIES (
        'partition.include.keys'=true,
    )
    connector.class=io.lenses.streamreactor.connect.azure.datalake.sink.DatalakeSinkConnector
    connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    {
      "key": <the message Key, which can be a primitive or a complex object>,
      "value": <the message Key, which can be a primitive or a complex object>,
      "headers": {
        "header1": "value1",
        "header2": "value2"
      },
      "metadata": {
        "offset": 0,
        "partition": 0,
        "timestamp": 0,
        "topic": "topic"
      }
    }
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
      ...
      connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter=org.apache.kafka.connect.storage.StringConverter
      ...
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    ...
    pgsqlCopyEditmessage1 -> schema1  
    message2 -> schema1  
      (No flush needed – same schema)
    
    message3 -> schema2  
      (Flush occurs – new schema introduced)
    
    message4 -> schema2  
      (No flush needed – same schema)
    
    message5 -> schema1  
      Without optimization: would trigger a flush  
      With optimization: no flush – schema1 is backward-compatible with schema2
    
    message6 -> schema2  
    message7 -> schema2  
      (No flush needed – same schema, it would happen based on the flush thresholds)
    ...
    connect.datalake.azure.auth.mode=Credentials
    connect.datalake.azure.account.name=$AZURE_ACCOUNT_NAME
    connect.datalake.azure.account.key=$AZURE_ACCOUNT_KEY
    ...
    ...
    connect.datalake.azure.auth.mode=ConnectionString
    connect.datalake.azure.connection.string=$AZURE_CONNECTION_STRING
    ...
    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Insert Mode

    Insert is the default write mode of the sink.

    hashtag
    Upsert Mode

    The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

    hashtag
    Batching

    The BATCH clause controls the batching of writes to MongoDB.

    hashtag
    TLS/SSL

    TLS/SSL is supported by setting ?ssl=true in the connect.mongo.connection option. The MongoDB driver will then attempt to load the truststore and keystore using the JVM system properties.

    You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:

    • javax.net.ssl.trustStore: the path to a trust store containing the certificate of the signing authority

    • javax.net.ssl.trustStorePassword: the password to access this trust store

    • javax.net.ssl.keyStore: the path to a key store containing the client’s SSL certificates

    • javax.net.ssl.keyStorePassword: the password to access this key store

    hashtag
    Authentication Mechanism

    All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism in the connect.mongo.connection option.

    The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism option.

    If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism has precedence.

    e.g.

    hashtag
    JSON Field dates

    List of fields that should be converted to ISO Date on MongoDB insertion (comma-separated field names), for JSON topics only. Field values may be an epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If the string does not parse to ISO, it will be written as a string instead.

    Subdocument fields can be referred to in the following examples:

    • topLevelFieldName

    • topLevelSubDocument.FieldName

    • topLevelParent.subDocument.subDocument2.FieldName

    If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.

    This is controlled via the connect.mongo.json_datetime_fields option.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    ssl.cipher.suites

    A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.

    list

    ssl.enabled.protocols

    The list of protocols enabled for SSL connections.

    list

    [TLSv1.2, TLSv1.1, TLSv1]

    ssl.keystore.password

    The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

    password

    io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector
    name=mongo
    connector.class=io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector
    tasks.max=1
    topics=orders
    connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders
    connect.mongo.db=connect
    connect.mongo.connection=mongodb://mongo:27017
    INSERT | UPSERT
    INTO <collection_name>
    SELECT FIELD, ...
    FROM <kafka-topic>
    BATCH = 100
    --  Select all fields from topic fx_prices and insert into the fx collection
    INSERT INTO fx SELECT * FROM fx_prices
    
    --  Select all fields from topic fx_prices and upsert into the fx collection,
    --  The assumption is there will be a ticker field in the incoming json:
    UPSERT INTO fx SELECT * FROM fx_prices PK ticker
    # default of scram
    mongodb://host1/?authSource=db1
    # scram explict
    mongodb://host1/?authSource=db1&authMechanism=SCRAM-SHA-1
    # mongo-cr
    mongodb://host1/?authSource=db1&authMechanism=MONGODB-CR
    # x.509
    mongodb://host1/?authSource=db1&authMechanism=MONGODB-X509
    # kerberos
    mongodb://host1/?authSource=db1&authMechanism=GSSAPI
    # ldap
    mongodb://host1/?authSource=db1&authMechanism=PLAIN

    ssl.key.password

    The password of the private key in the key store file. This is optional for client.

    password

    ssl.keystore.type

    The file format of the key store file. This is optional for client.

    string

    JKS

    ssl.truststore.location

    The location of the trust store file.

    string

    ssl.endpoint.identification.algorithm

    The endpoint identification algorithm to validate server hostname using server certificate.

    string

    https

    ssl.protocol

    The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.

    string

    TLS

    ssl.trustmanager.algorithm

    The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

    string

    PKIX

    ssl.secure.random.implementation

    The SecureRandom PRNG implementation to use for SSL cryptography operations.

    string

    ssl.truststore.type

    The file format of the trust store file.

    string

    JKS

    ssl.keymanager.algorithm

    The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    string

    SunX509

    ssl.provider

    The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

    string

    ssl.keystore.location

    The location of the key store file. This is optional for client and can be used for two-way authentication for client.

    string

    ssl.truststore.password

    The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

    password

    connect.mongo.connection

    The mongodb connection in the format mongodb://[username:password@]host1[:port1],host2[:port2],…[,hostN[:portN]]][/[database][?options]].

    string

    connect.mongo.db

    The mongodb target database.

    string

    connect.mongo.username

    The username to use when authenticating

    string

    connect.mongo.password

    The password for the use when authenticating

    password

    connect.mongo.auth.mechanism

    String

    string

    SCRAM-SHA-1

    connect.mongo.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.mongo.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.mongo.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.mongo.kcql

    KCQL expression describing field selection and data routing to the target mongo db.

    string

    connect.mongo.json_datetime_fields

    List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead. Subdocument fields can be referred to as in the following examples: “topLevelFieldName”, “topLevelSubDocument.FieldName”, “topLevelParent.subDocument.subDocument2.FieldName”, (etc.) If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.

    list

    []

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    Cassandra

    This page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.

    Stream Reactor Cassandra Sink Connector is designed to move data from Apache Kafka to Apache Cassandra . It supports Apache Cssandra 3.0 and later, DataStax Enterprise 4.7 and later, and DataStax Astra databases.

    hashtag
    Connector Class

    The release artifactarrow-up-right is called kafka-connect-casandra-sink.

    hashtag
    Features

    • Database authentication : User-password, LDAP, Kerberos

    • Input formats: The connector supports Avro, Json Schema, Protobuf, Json (schemaless) and primitive data formats. For Schema-Registry based formats, like AVRO, Protobuf, Json-schema, it needs the the Scheam Registry settings.

    • At least once semantics : The Connect framework stores record offsets in Kafka and resumes from the last committed offset on restart. This ensures reliable delivery but may occasionally result in duplicate record processing during failures

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    Connection

    The connector requires a connection to the database. To enable it, specify the following configuration entries. See Configuration Reference for details.

    hashtag
    Security

    hashtag
    SSL connection

    When the database cluster has enabled client encryption, configure the SSL Keys and certificates:

    hashtag
    User-Password or LDAP authentication

    When the database is configured with user-password of LDAP authentication, configure the following:

    hashtag
    Kerberos authentication

    When the database cluster has Kerberos authentication enabled set the following settings:

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    KCQL (Kafka Connect Query Language) is a SQL-like syntax that provides a declarative way to configure connector behavior. It serves as the primary interface for defining how data flows from source topics to target Cassandra tables.

    hashtag
    Purpose and Functionality

    KCQL statements define three core aspects of data pipeline configuration:

    Data Source and Target: Specifies which Kafka topic serves as the data source and which Cassandra table receives the data.

    Field Mapping: Controls how fields from the source topic map to columns in the target table, including transformations and selective field inclusion.

    Operational Parameters: Configures advanced settings such as consistency levels, delete operations, time-to-live (TTL) settings, and timestamp handling.

    hashtag
    Key Capabilities

    • Flexible Field Mapping: Map source fields to target columns with optional transformations

    • Consistency Control: Set read and write consistency levels for Cassandra operations

    • Delete Operations: Enable or disable delete functionality based on message content

    hashtag
    Syntax Template

    The basic KCQL statement follows this structure:

    INSERT INTO: Specifies the target Cassandra keyspace and table where data will be written.

    SELECT: Defines field projection and mapping from the source.

    FROM: Identifies the source Kafka topic containing the data to be processed.

    PROPERTIES: Optional clause for configuring connector behavior such as consistency levels, TTL settings, and operational parameters.

    hashtag
    Examples

    hashtag
    Table mapping

    hashtag
    Kafka to Database Table Mapping

    The connector enables mapping fields from Kafka records to database table columns. A single connector can handle multiple topics, where each topic can map to one or more tables. Kafka messages consist of a Key, Value, and Header.

    Generic Mapping Syntax

    Use the following syntax to map Kafka message fields to database columns:

    • Prefix the field with:

      • _value: For fields from the Kafka message's Value component.

      • _key: For fields from the Kafka message's Key component.

    Note: If no prefix is provided, _value is assumed, mapping to the Kafka record's Value component.

    The record Key and Value can be mapped to specific columns like this, considering a table with row_key and content columns:

    To map database columns with whitespace to KCQL projections, follow this format:

    hashtag
    Fan-out

    You can map a topic's data to multiple database tables using multiple KCQL statements in the same connect.cassandra.kcql configuration. For example:

    hashtag
    User defined types

    The connector can map complex types from a Kafka record payload into a user-defined type (UDT) column in the database. The incoming data field names must match the database field names.

    Consider the following Kafka record format:

    key
    value

    and database definition:

    The mapping configuration should be:

    hashtag
    Using now() function

    You can leverage the now() which returns TIMEUUID function in the mapping. Here is how to use it in KCQL projection:

    hashtag
    Write Timestamp

    To specify an internal write-time timestamp from the database, choose a numeric field from the Kafka record payload. Use the following mapping syntax:

    Examples:

    hashtag
    Row-Level TTL

    To define the time-to-live (TTL) for a database record, you can optionally map a field from the Kafka record payload. Use the following query:

    hashtag
    Examples:

    These examples demonstrate specifying TTL units such as 'MINUTES', 'HOURS', or 'DAYS', in addition to the default 'SECONDS'.

    hashtag
    Converting date and time for a topic

    To configure data and time conversion properties for a topic, use the following SQL command:

    Codec Parameter Descriptions:

    Parameter
    Description
    Default

    hashtag
    CQL Queries

    hashtag
    Advanced CQL Query Configuration for Kafka Records

    When a new Kafka record arrives, you have the option to run a custom CQL query. This feature is designed for advanced use cases; typically, the standard Kafka mapping suffices without needing a query. If you specify a query in the topic-to-table mapping, it will take precedence over the default action.

    Important: You must include the bound variables used in the query within the mapping column.

    Example

    hashtag
    Extra settings

    Parameter
    Description
    Default

    hashtag
    Java Driver settings

    Using the connector configuration, prefix with

    any specific setting you would want to set for the Cassandra client. Refer to the for more information.

    hashtag
    Configuration Reference

    Setting
    Description
    Default Value
    Type

    hashtag
    Error policies

    The connector supports .

    io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector

    Kafka topic to database's table mapping: It allows the control of which Kafka message fields are written to the database table columns

  • Multi-table mapping: Enables writing a topic to multiple database tables

  • Date/Time/Timestamp formats: Allows control of the date, time, and timestamp format conversion between Kafka messages and database columns, supporting custom parsing and formatting patterns.

  • Consistency Level control: Allows configuring the write consistency on a per table mapping

  • Row Level Time-to-Live: Allows configuring the row-level TTL on a per table mapping

  • Deletes: Allows configuring the deletes on a per table mapping

  • TTL Management: Configure automatic data expiration using time-to-live settings
  • Timestamp Handling: Control how timestamps are processed and stored

  • Conditional Logic: Apply filtering and conditional processing to incoming data

  • _header: For fields from the Kafka message's Header.

    codec.timeZone

    Defines the time zone for conversions without an explicit time zone.

    UTC

    codec.locale

    Specifies the locale for locale-sensitive conversions.

    en_US

    connect.cassandra.connection.pool.size

    The number of connections to maintain in the connection pool.

    2

    String

    connect.cassandra.compression

    Compression algorithm to use for the connection. Defaults to LZ4.

    LZ4

    String

    connect.cassandra.query.timeout.ms

    The Cassandra driver query timeout in milliseconds.

    20000

    Int

    connect.cassandra.max.batch.size

    Number of records to include in a write request to the database table.

    64

    Int

    connect.cassandra.load.balancing.local.dc

    The case-sensitive datacenter name for the driver to use for load balancing.

    (no default)

    String

    connect.cassandra.auth.provider

    Authentication provider

    None

    String

    connect.cassandra.auth.username

    Username for PLAIN (username/password) provider authentication

    ""

    String

    connect.cassandra.auth.password

    Password for PLAIN (username/password) provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.keytab

    Kerberos keytab file for GSSAPI provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.principal

    Kerberos principal for GSSAPI provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.service

    SASL service name to use for GSSAPI provider authentication

    dse

    String

    connect.cassandra.ssl.enabled

    Secure Cassandra driver connection via SSL.

    false

    String

    connect.cassandra.ssl.provider

    The SSL provider to use for the connection. Available values are None, JDK or OpenSSL. Defaults to None.

    None

    String

    connect.cassandra.ssl.truststore.path

    Path to the client Trust Store.

    (no default)

    String

    connect.cassandra.ssl.truststore.password

    Password for the client Trust Store.

    (no default)

    String

    connect.cassandra.ssl.keystore.path

    Path to the client Key Store.

    (no default)

    String

    connect.cassandra.ssl.keystore.password

    Password for the client Key Store

    (no default)

    String

    connect.cassandra.ssl.cipher.suites

    The SSL cipher suites to use for the connection.

    (no default)

    String

    connect.cassandra.ssl.hostname.verification

    Enable hostname verification for the connection.

    true

    String

    connect.cassandra.ssl.openssl.key.cert.chain

    Enable OpenSSL key certificate chain for the connection.

    (no default)

    String

    connect.cassandra.ssl.openssl.private.key

    Enable OpenSSL private key for the connection.

    (no default)

    String

    connect.cassandra.ignore.errors.mode

    Can be one of 'none', 'all' or 'driver'

    none

    String

    connect.cassandra.retry.interval

    The time in milliseconds between retries.

    60000

    String

    connect.cassandra.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed; THROW - the error is allowed to propagate; RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

    THROW

    String

    connect.cassandra.max.retries

    The maximum number of times to try the write again.

    20

    Int

    connect.cassandra.kcql

    KCQL expression describing field selection and routes.

    (no default)

    String

    connect.cassandra.progress.enabled

    Enables the output for how many records have been processed

    false

    Boolean

    Tweaks the Cassandra driver settings.

    Refer to the

    APPLE

    codec.timestamp

    Defines the pattern for converting strings to CQL timestamps. Options include specific date-time patterns like yyyy-MM-dd HH:mm:ss or pre-defined formats such as ISO_ZONED_DATE_TIME and ISO_INSTANT. The special formatter CQL_TIMESTAMP supports all CQL timestamp formats.

    CQL_TIMESTAMP

    codec.date

    Specifies the pattern for converting strings to CQL dates. Options include date-time patterns like yyyy-MM-dd and formatters such as ISO_LOCAL_DATE.

    ISO_LOCAL_DATE

    codec.time

    Sets the pattern for converting strings to CQL time, using patterns like HH:mm:ss or formatters such as ISO_LOCAL_TIME.

    ISO_LOCAL_TIME

    codec.unit

    For digit-only inputs not parsed by codec.timestamp, this sets the time unit for conversion. Accepts all TimeUnit enum constants.

    consitencyLevel

    Query Consistency Level Options:

    • ALL

    • EACH_QUORUM

    • QUORUM

    • LOCAL_QUORUM

    • ONE

    • TWO

    • THREE

    • LOCAL_ONE

    • ANY

    CQL_TIMESTAMP

    ttl

    Specify the number of seconds before data is automatically deleted from the DSE table. When set, all rows in the topic table will share this TTL value.

    -1

    nullToUnset

    When handling nulls in Kafka, it's advisable to treat them as UNSET in DSE. DataStax suggests sticking with the default setting to minimize the creation of unnecessary tombstones.

    true

    deletesEnabled

    Enable this feature to treat records as deletes if, after mapping, only the primary key columns have non-null values. This prevents the insertion or update of nulls in regular columns.

    connect.cassandra.contact.points

    A comma-separated list of host names or IP addresses

    localhost

    String

    connect.cassandra.port

    Cassandra native port.

    9042

    String

    connect.cassandra.max.concurrent.requests

    Maximum number of requests to send to database at the same time.

    100

    tutorials
    DataStax Java driver documentationarrow-up-right
    Error policies

    MILLISECONDS

    true

    String

    HTTP

    This page describes the usage of the Stream Reactor HTTP Sink Connector.

    A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.

    hashtag
    Features

    • Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).

    Simple configuration
    name=cassandra-sink
    connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
    tasks.max=1
    topics=orders
    connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
    connect.cassandra.port=9042 
    connect.cassandra.contact.points=cassandra
    connect.cassandra.contact.points=[host list]
    connnect.cassandra.port=9042
    connect.cassandra.load.balancing.local.dc=datacentre-name
    # optional entries
    # connect.cassandra.max.concurrent.requests = 100
    # connect.cassandra.max.batch.size = 64
    # connect.cassandra.connection.pool.size = 4
    # connect.cassandra.query.timeout.ms = 30
    # connect.cassandra.compression = None
    connect.cassandra.ssl.provider=
    connect.cassandra.ssl.cipher.suites=
    connect.cassandra.ssl.hostname.verification=true
    
    
    connect.cassandra.ssl.keystore.path=
    connect.cassandra.ssl.keystore.password=
    
    connect.cassandra.ssl.truststore.password=
    connect.cassandra.ssl.truststore.path=
    
    # Path to the SSL certificate file, when using OpenSSL.
    connect.cassandra.ssl.openssl.key.cert.chain=
    
    # Path to the private key file, when using OpenSSL.
    connect.cassandra.ssl.openssl.private.key=
    connect.cassandra.auth.provider=
    connect.cassandra.auth.username=
    connect.cassandra.auth.password=
    
    # for SASL authentication which requires connect.cassandra.auth.provider
    # set to GSSAPI 
    connect.cassandra.auth.gssapi.keytab=
    connect.cassandra.auth.gssapi.principal=
    connect.cassandra.auth.gssapi.service=
    connect.cassandra.auth.provider=GSSAPI
    connect.cassandra.auth.gssapi.keytab=
    connect.cassandra.auth.gssapi.principal=
    connect.cassandra.auth.gssapi.service=
    INSERT INTO <keyspace><your-cassandra-table>
    SELECT <field projection,...
    FROM <your-table>
    PROPERTIES(< a set of keys to control the connector behaviour>)
    INSERT INTO mykeyspace.types
    SELECT 
     _key.bigint as bigintcol
     , _value.boolean as booleancol
     , _key.double as doublecol
     , _value.float as floatcol
     , _header.int as intcol
    FROM myTopic
    
    INSERT INTO mykespace.types
    SELECT 
     _value.bigint as bigintcol
     , _value.double as doublecol
     , _value.ttlcol as message_internal_ttl 
     , _value.timestampcol as message_internal_timestamp
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
    
    INSERT INTO mykeyspace.CASE_SENSITIVE
    SELECT
        `_key.'bigint field'` as 'bigint col', 
        `_key.'boolean-field'` as 'boolean-col', 
        `_value.'INT FIELD'` as 'INT COL', 
        `_value.'TEXT.FIELD'` as 'TEXT.COL'
    FROM mytopic 
    
    
    INSERT INTO mykeyspace.tableA 
    SELECT 
        key.my_pk as my_pk
        , _value.my_value as my_value 
    FROM topicA
    PROPERTIES(
     'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
     'deletesEnabled' ='true'
    )
    INSERT INTO ${target}
    SELECT _value/_key/_header.{field_path} AS ${table_column}
    FROM mytopic
    INSERT INTO ${target}
    SELECT 
      _key as row_key
      , _value as content
    INSERT INTO ${target}
    SELECT
      `_key.'bigint field'` as 'bigint col',
      `_key.'boolean-field'` as 'boolean-col',
      `_value.'INT FIELD'` as 'INT COL',
      `_value.'TEXT.FIELD'` as 'TEXT.COL'
    FROM myTopic
    INSERT INTO stocks_keyspace.stocks_by_symbol
    SELECT _value.symbol AS ticker, 
           _value.ts AS ts, 
           _value.exchange AS exchange, 
           _value.value AS value 
    FROM stocks;
    INSERT INTO stocks_keyspace.stocks_by_exchange
    SELECT _value.symbol AS ticker, 
           _value.ts AS ts, 
           _value.exchange AS exchange, 
           _value.value AS value 
    FROM stocks;
    {"symbol":"APPL",
    "value":214.4,
    "exchange":"NASDAQ",
    "ts":"2025-07-23T14:56:10.009"}
    CREATE TYPE stocks_keyspace.stocks_type (
        symbol text,
        ts timestamp,
        exchange text,
        value double
    );
    
    CREATE TABLE stocks_keyspace.stocks_table (
        name text PRIMARY KEY,
        stocks FROZEN<stocks_type>
    );
    INSERT INTO stocks_keyspace.stocks_table
    SELECT _key AS name, _value AS stocks
    FROM stocks
    INSERT INTO ${target}
    SELECT
      `now()` as loaded_at
    FROM myTopic
    SELECT [_value/_key/_header].{path} AS message_internal_timestamp
    FROM myTopic
    --You may optionally specify the time unit by using:
    PROPERTIES('timestampTimeUnit'='MICROSECONDS')
    SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
    SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
    SELECT _key.timestampcol AS message_internal_timestamp FROM myTopic
    SELECT [_value/_key/_header].{path} as message_internal_ttl
    FROM myTopic
    -- Optional: Specify the TTL unit
    PROPERTIES('ttlTimeUnit'='SECONDS')
    SELECT _header.ttl as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='MINUTES')
    
    SELECT _key.expiry as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='HOURS')
    
    SELECT _value.ttl_duration as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='DAYS')
    INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
        'codec.locale' = 'en_US',
        'codec.timeZone' = 'UTC',
        'codec.timestamp' = 'CQL_TIMESTAMP',
        'codec.date' = 'ISO_LOCAL_DATE',
        'codec.time' = 'ISO_LOCAL_TIME',
        'codec.unit' = 'MILLISECONDS'
    )
    INSERT INTO ${target}
    SELECT 
        _value.bigint as some_name,
        _value.int as some_name2
    FROM myTopic
    PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')
    connect.cassandra.driver.{driver_setting}
    DataStax Java driver documentationarrow-up-right
    connect.cassandra.driver.*
    connect.cassandra.driver.basic.request.consistency=ALL

    URL, header and content templating ability give you full control of the HTTP request.

  • Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.

  • hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the tutorials.

    hashtag
    Content Template

    The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.

    hashtag
    Static Templating

    If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:

    hashtag
    Single Message Templating

    When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.

    In your configuration, in the content property of your config, you can define template substitutions like the following example:

    (please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)

    hashtag
    Multiple Message Templating

    To collapse multiple messages into a single HTTP request, you can use the multiple messaging template. This is automatic if the template has a messages tag. See the below example:

    Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.

    Your connector configuration will look like this:

    The final result will be HTTP requests with bodies like this:

    hashtag
    Available Keys

    When using simple and multiple message templating, the following are available:

    Field
    Usage Example

    Header

    {{header.correlation-id}}

    Value

    {{value}}

    {{value.product.id}}

    Key

    {{key}}

    {{key.customer.number}}

    Topic

    {{topic}}

    hashtag
    URL Templating

    URL including protocol (eg. http://lenses.io). Template variables can be used.

    The URL is also a Content Template so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.

    hashtag
    Authentication Options

    Currently, the HTTP Sink supports either no authentication, BASIC HTTP authentication and OAuth2 authentication.

    hashtag
    No Authentication (Default)

    By default, no authentication is set. This can be also done by providing a configuration like this:

    hashtag
    BASIC HTTP Authentication

    BASIC auth can be configured by providing a configuration like this:

    hashtag
    OAuth2 Authentication

    OAuth auth can be configured by providing a configuration like this:

    hashtag
    Headers List

    To customise the headers sent with your HTTP request you can supply a Headers List.

    Each header key and value is also a Content Template so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.

    Example:

    hashtag
    SSL Configuration

    Enabling SSL connections between Kafka Connect and HTTP Endpoint ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity. Please check out SSL Configuration Properties section in order to set it up.

    hashtag
    Batch Configuration

    The connector offers three distinct flush options for data management:

    • Flush by Count - triggers a file flush after a specified number of records have been written to it.

    • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

    • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

    It's worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven't reached their thresholds.

    Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

    The flush options are configured using the batchCount, batchSize and `timeInterval properties. The settings are optional and if not specified the defaults are:

    Field
    Default

    batchCount

    50_000 records

    batchSize

    500000000 (500MB)

    timeInterval

    3_600 seconds (1 hour)

    hashtag
    Configuration Examples

    Some configuration examples follow on how to apply this connector to different message types.

    These include converters, which are required to instruct Kafka Connect on how to read the source content.

    hashtag
    Static string template

    In this case the converters are irrelevant as we are not using the message content to populate our message template.

    hashtag
    Dynamic string template

    The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.

    hashtag
    Dynamic string template containing json message fields

    Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.