arrow-left

All pages
gitbookPowered by GitBook
1 of 17

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Cassandra

This page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.

Stream Reactor Cassandra Sink Connector is designed to move data from Apache Kafka to Apache Cassandra . It supports Apache Cssandra 3.0 and later, DataStax Enterprise 4.7 and later, and DataStax Astra databases.

hashtag
Connector Class

io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector

The release artifactarrow-up-right is called kafka-connect-casandra-sink.

hashtag
Features

  • Database authentication : User-password, LDAP, Kerberos

  • Input formats: The connector supports Avro, Json Schema, Protobuf, Json (schemaless) and primitive data formats. For Schema-Registry based formats, like AVRO, Protobuf, Json-schema, it needs the the Scheam Registry settings.

  • At least once semantics : The Connect framework stores record offsets in Kafka and resumes from the last committed offset on restart. This ensures reliable delivery but may occasionally result in duplicate record processing during failures

hashtag
Example

circle-check

For more examples see the .

hashtag
Connection

The connector requires a connection to the database. To enable it, specify the following configuration entries. See Configuration Reference for details.

hashtag
Security

hashtag
SSL connection

When the database cluster has enabled client encryption, configure the SSL Keys and certificates:

hashtag
User-Password or LDAP authentication

When the database is configured with user-password of LDAP authentication, configure the following:

hashtag
Kerberos authentication

When the database cluster has Kerberos authentication enabled set the following settings:

hashtag
KCQL support

circle-check

You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

KCQL (Kafka Connect Query Language) is a SQL-like syntax that provides a declarative way to configure connector behavior. It serves as the primary interface for defining how data flows from source topics to target Cassandra tables.

hashtag
Purpose and Functionality

KCQL statements define three core aspects of data pipeline configuration:

Data Source and Target: Specifies which Kafka topic serves as the data source and which Cassandra table receives the data.

Field Mapping: Controls how fields from the source topic map to columns in the target table, including transformations and selective field inclusion.

Operational Parameters: Configures advanced settings such as consistency levels, delete operations, time-to-live (TTL) settings, and timestamp handling.

hashtag
Key Capabilities

  • Flexible Field Mapping: Map source fields to target columns with optional transformations

  • Consistency Control: Set read and write consistency levels for Cassandra operations

  • Delete Operations: Enable or disable delete functionality based on message content

hashtag
Syntax Template

The basic KCQL statement follows this structure:

INSERT INTO: Specifies the target Cassandra keyspace and table where data will be written.

SELECT: Defines field projection and mapping from the source.

FROM: Identifies the source Kafka topic containing the data to be processed.

PROPERTIES: Optional clause for configuring connector behavior such as consistency levels, TTL settings, and operational parameters.

hashtag
Examples

hashtag
Table mapping

hashtag
Kafka to Database Table Mapping

The connector enables mapping fields from Kafka records to database table columns. A single connector can handle multiple topics, where each topic can map to one or more tables. Kafka messages consist of a Key, Value, and Header.

Generic Mapping Syntax

Use the following syntax to map Kafka message fields to database columns:

  • Prefix the field with:

    • _value: For fields from the Kafka message's Value component.

    • _key: For fields from the Kafka message's Key component.

Note: If no prefix is provided, _value is assumed, mapping to the Kafka record's Value component.

The record Key and Value can be mapped to specific columns like this, considering a table with row_key and content columns:

To map database columns with whitespace to KCQL projections, follow this format:

hashtag
Fan-out

You can map a topic's data to multiple database tables using multiple KCQL statements in the same connect.cassandra.kcql configuration. For example:

hashtag
User defined types

The connector can map complex types from a Kafka record payload into a user-defined type (UDT) column in the database. The incoming data field names must match the database field names.

Consider the following Kafka record format:

key
value

and database definition:

The mapping configuration should be:

hashtag
Using now() function

You can leverage the now() which returns TIMEUUID function in the mapping. Here is how to use it in KCQL projection:

hashtag
Write Timestamp

To specify an internal write-time timestamp from the database, choose a numeric field from the Kafka record payload. Use the following mapping syntax:

Examples:

hashtag
Row-Level TTL

To define the time-to-live (TTL) for a database record, you can optionally map a field from the Kafka record payload. Use the following query:

hashtag
Examples:

These examples demonstrate specifying TTL units such as 'MINUTES', 'HOURS', or 'DAYS', in addition to the default 'SECONDS'.

hashtag
Converting date and time for a topic

To configure data and time conversion properties for a topic, use the following SQL command:

Codec Parameter Descriptions:

Parameter
Description
Default

hashtag
CQL Queries

hashtag
Advanced CQL Query Configuration for Kafka Records

When a new Kafka record arrives, you have the option to run a custom CQL query. This feature is designed for advanced use cases; typically, the standard Kafka mapping suffices without needing a query. If you specify a query in the topic-to-table mapping, it will take precedence over the default action.

Important: You must include the bound variables used in the query within the mapping column.

Example

hashtag
Extra settings

Parameter
Description
Default

hashtag
Java Driver settings

Using the connector configuration, prefix with

any specific setting you would want to set for the Cassandra client. Refer to the for more information.

hashtag
Configuration Reference

Setting
Description
Default Value
Type

hashtag
Error policies

The connector supports .

MQTT

This page describes the usage of the Stream Reactor MQTT Sink Connector.

hashtag
Connector Class

hashtag
Example

circle-check

Kafka topic to database's table mapping: It allows the control of which Kafka message fields are written to the database table columns

  • Multi-table mapping: Enables writing a topic to multiple database tables

  • Date/Time/Timestamp formats: Allows control of the date, time, and timestamp format conversion between Kafka messages and database columns, supporting custom parsing and formatting patterns.

  • Consistency Level control: Allows configuring the write consistency on a per table mapping

  • Row Level Time-to-Live: Allows configuring the row-level TTL on a per table mapping

  • Deletes: Allows configuring the deletes on a per table mapping

  • TTL Management: Configure automatic data expiration using time-to-live settings
  • Timestamp Handling: Control how timestamps are processed and stored

  • Conditional Logic: Apply filtering and conditional processing to incoming data

  • _header: For fields from the Kafka message's Header.

    Sets the pattern for converting strings to CQL time, using patterns like HH:mm:ss or formatters such as ISO_LOCAL_TIME.

    ISO_LOCAL_TIME

    codec.unit

    For digit-only inputs not parsed by codec.timestamp, this sets the time unit for conversion. Accepts all TimeUnit enum constants.

    MILLISECONDS

    codec.timeZone

    Defines the time zone for conversions without an explicit time zone.

    UTC

    codec.locale

    Specifies the locale for locale-sensitive conversions.

    en_US

    When handling nulls in Kafka, it's advisable to treat them as UNSET in DSE. DataStax suggests sticking with the default setting to minimize the creation of unnecessary tombstones.

    true

    deletesEnabled

    Enable this feature to treat records as deletes if, after mapping, only the primary key columns have non-null values. This prevents the insertion or update of nulls in regular columns.

    true

    String

    connect.cassandra.max.concurrent.requests

    Maximum number of requests to send to database at the same time.

    100

    String

    connect.cassandra.connection.pool.size

    The number of connections to maintain in the connection pool.

    2

    String

    connect.cassandra.compression

    Compression algorithm to use for the connection. Defaults to LZ4.

    LZ4

    String

    connect.cassandra.query.timeout.ms

    The Cassandra driver query timeout in milliseconds.

    20000

    Int

    connect.cassandra.max.batch.size

    Number of records to include in a write request to the database table.

    64

    Int

    connect.cassandra.load.balancing.local.dc

    The case-sensitive datacenter name for the driver to use for load balancing.

    (no default)

    String

    connect.cassandra.auth.provider

    Authentication provider

    None

    String

    connect.cassandra.auth.username

    Username for PLAIN (username/password) provider authentication

    ""

    String

    connect.cassandra.auth.password

    Password for PLAIN (username/password) provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.keytab

    Kerberos keytab file for GSSAPI provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.principal

    Kerberos principal for GSSAPI provider authentication

    ""

    String

    connect.cassandra.auth.gssapi.service

    SASL service name to use for GSSAPI provider authentication

    dse

    String

    connect.cassandra.ssl.enabled

    Secure Cassandra driver connection via SSL.

    false

    String

    connect.cassandra.ssl.provider

    The SSL provider to use for the connection. Available values are None, JDK or OpenSSL. Defaults to None.

    None

    String

    connect.cassandra.ssl.truststore.path

    Path to the client Trust Store.

    (no default)

    String

    connect.cassandra.ssl.truststore.password

    Password for the client Trust Store.

    (no default)

    String

    connect.cassandra.ssl.keystore.path

    Path to the client Key Store.

    (no default)

    String

    connect.cassandra.ssl.keystore.password

    Password for the client Key Store

    (no default)

    String

    connect.cassandra.ssl.cipher.suites

    The SSL cipher suites to use for the connection.

    (no default)

    String

    connect.cassandra.ssl.hostname.verification

    Enable hostname verification for the connection.

    true

    String

    connect.cassandra.ssl.openssl.key.cert.chain

    Enable OpenSSL key certificate chain for the connection.

    (no default)

    String

    connect.cassandra.ssl.openssl.private.key

    Enable OpenSSL private key for the connection.

    (no default)

    String

    connect.cassandra.ignore.errors.mode

    Can be one of 'none', 'all' or 'driver'

    none

    String

    connect.cassandra.retry.interval

    The time in milliseconds between retries.

    60000

    String

    connect.cassandra.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed; THROW - the error is allowed to propagate; RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

    THROW

    String

    connect.cassandra.max.retries

    The maximum number of times to try the write again.

    20

    Int

    connect.cassandra.kcql

    KCQL expression describing field selection and routes.

    (no default)

    String

    connect.cassandra.progress.enabled

    Enables the output for how many records have been processed

    false

    Boolean

    Tweaks the Cassandra driver settings.

    Refer to the

    Simple configuration
    name=cassandra-sink
    connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
    tasks.max=1
    topics=orders
    connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
    connect.cassandra.port=9042 
    connect.cassandra.contact.points=cassandra
    connect.cassandra.contact.points=[host list]
    connnect.cassandra.port=9042
    connect.cassandra.load.balancing.local.dc=datacentre-name
    # optional entries
    # connect.cassandra.max.concurrent.requests = 100
    # connect.cassandra.max.batch.size = 64
    # connect.cassandra.connection.pool.size = 4
    # connect.cassandra.query.timeout.ms = 30
    # connect.cassandra.compression = None
    connect.cassandra.ssl.provider=
    connect.cassandra.ssl.cipher.suites=
    connect.cassandra.ssl.hostname.verification=true
    
    
    connect.cassandra.ssl.keystore.path=
    connect.cassandra.ssl.keystore.password=
    
    connect.cassandra.ssl.truststore.password=
    connect.cassandra.ssl.truststore.path=
    
    # Path to the SSL certificate file, when using OpenSSL.
    connect.cassandra.ssl.openssl.key.cert.chain=
    
    # Path to the private key file, when using OpenSSL.
    connect.cassandra.ssl.openssl.private.key=
    connect.cassandra.auth.provider=
    connect.cassandra.auth.username=
    connect.cassandra.auth.password=
    
    # for SASL authentication which requires connect.cassandra.auth.provider
    # set to GSSAPI 
    connect.cassandra.auth.gssapi.keytab=
    connect.cassandra.auth.gssapi.principal=
    connect.cassandra.auth.gssapi.service=
    connect.cassandra.auth.provider=GSSAPI
    connect.cassandra.auth.gssapi.keytab=
    connect.cassandra.auth.gssapi.principal=
    connect.cassandra.auth.gssapi.service=
    INSERT INTO <keyspace><your-cassandra-table>
    SELECT <field projection,...
    FROM <your-table>
    PROPERTIES(< a set of keys to control the connector behaviour>)
    INSERT INTO mykeyspace.types
    SELECT 
     _key.bigint as bigintcol
     , _value.boolean as booleancol
     , _key.double as doublecol
     , _value.float as floatcol
     , _header.int as intcol
    FROM myTopic
    
    INSERT INTO mykespace.types
    SELECT 
     _value.bigint as bigintcol
     , _value.double as doublecol
     , _value.ttlcol as message_internal_ttl 
     , _value.timestampcol as message_internal_timestamp
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
    
    INSERT INTO mykeyspace.CASE_SENSITIVE
    SELECT
        `_key.'bigint field'` as 'bigint col', 
        `_key.'boolean-field'` as 'boolean-col', 
        `_value.'INT FIELD'` as 'INT COL', 
        `_value.'TEXT.FIELD'` as 'TEXT.COL'
    FROM mytopic 
    
    
    INSERT INTO mykeyspace.tableA 
    SELECT 
        key.my_pk as my_pk
        , _value.my_value as my_value 
    FROM topicA
    PROPERTIES(
     'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
     'deletesEnabled' ='true'
    )
    INSERT INTO ${target}
    SELECT _value/_key/_header.{field_path} AS ${table_column}
    FROM mytopic
    INSERT INTO ${target}
    SELECT 
      _key as row_key
      , _value as content
    INSERT INTO ${target}
    SELECT
      `_key.'bigint field'` as 'bigint col',
      `_key.'boolean-field'` as 'boolean-col',
      `_value.'INT FIELD'` as 'INT COL',
      `_value.'TEXT.FIELD'` as 'TEXT.COL'
    FROM myTopic
    INSERT INTO stocks_keyspace.stocks_by_symbol
    SELECT _value.symbol AS ticker, 
           _value.ts AS ts, 
           _value.exchange AS exchange, 
           _value.value AS value 
    FROM stocks;
    INSERT INTO stocks_keyspace.stocks_by_exchange
    SELECT _value.symbol AS ticker, 
           _value.ts AS ts, 
           _value.exchange AS exchange, 
           _value.value AS value 
    FROM stocks;

    APPLE

    {"symbol":"APPL",
    "value":214.4,
    "exchange":"NASDAQ",
    "ts":"2025-07-23T14:56:10.009"}
    CREATE TYPE stocks_keyspace.stocks_type (
        symbol text,
        ts timestamp,
        exchange text,
        value double
    );
    
    CREATE TABLE stocks_keyspace.stocks_table (
        name text PRIMARY KEY,
        stocks FROZEN<stocks_type>
    );
    INSERT INTO stocks_keyspace.stocks_table
    SELECT _key AS name, _value AS stocks
    FROM stocks
    INSERT INTO ${target}
    SELECT
      `now()` as loaded_at
    FROM myTopic
    SELECT [_value/_key/_header].{path} AS message_internal_timestamp
    FROM myTopic
    --You may optionally specify the time unit by using:
    PROPERTIES('timestampTimeUnit'='MICROSECONDS')
    SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
    SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
    SELECT _key.timestampcol AS message_internal_timestamp FROM myTopic
    SELECT [_value/_key/_header].{path} as message_internal_ttl
    FROM myTopic
    -- Optional: Specify the TTL unit
    PROPERTIES('ttlTimeUnit'='SECONDS')
    SELECT _header.ttl as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='MINUTES')
    
    SELECT _key.expiry as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='HOURS')
    
    SELECT _value.ttl_duration as message_internal_ttl
    FROM myTopic
    PROPERTIES('ttlTimeUnit'='DAYS')
    INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
        'codec.locale' = 'en_US',
        'codec.timeZone' = 'UTC',
        'codec.timestamp' = 'CQL_TIMESTAMP',
        'codec.date' = 'ISO_LOCAL_DATE',
        'codec.time' = 'ISO_LOCAL_TIME',
        'codec.unit' = 'MILLISECONDS'
    )

    codec.timestamp

    Defines the pattern for converting strings to CQL timestamps. Options include specific date-time patterns like yyyy-MM-dd HH:mm:ss or pre-defined formats such as ISO_ZONED_DATE_TIME and ISO_INSTANT. The special formatter CQL_TIMESTAMP supports all CQL timestamp formats.

    CQL_TIMESTAMP

    codec.date

    Specifies the pattern for converting strings to CQL dates. Options include date-time patterns like yyyy-MM-dd and formatters such as ISO_LOCAL_DATE.

    ISO_LOCAL_DATE

    INSERT INTO ${target}
    SELECT 
        _value.bigint as some_name,
        _value.int as some_name2
    FROM myTopic
    PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')

    consitencyLevel

    Query Consistency Level Options:

    • ALL

    • EACH_QUORUM

    • QUORUM

    • LOCAL_QUORUM

    • ONE

    • TWO

    • THREE

    • LOCAL_ONE

    • ANY

    CQL_TIMESTAMP

    ttl

    Specify the number of seconds before data is automatically deleted from the DSE table. When set, all rows in the topic table will share this TTL value.

    -1

    connect.cassandra.driver.{driver_setting}

    connect.cassandra.contact.points

    A comma-separated list of host names or IP addresses

    localhost

    String

    connect.cassandra.port

    Cassandra native port.

    tutorials
    DataStax Java driver documentationarrow-up-right
    Error policies

    codec.time

    nullToUnset

    9042

    For more examples see the tutorials.

    hashtag
    KCQL Support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Dynamic targets

    The connector can route the messages to specific MQTT targets. These are the possible options:

    1. Given Constant Topic:

      Route messages to a specified MQTT topic using a constant value:

      INSERT INTO lenses-io-demo ...
    2. Using a Field Value for Topic:

      Direct messages to an MQTT topic based on a specified field's value. Example expression: fieldA.fieldB.fieldC.

      INSERT INTO `<path to field>` 
      SELECT * FROM control.boxes.test 
      PROPERTIES('mqtt.target.from.field'='true')
    3. Utilizing Message Key as Topic:

      Determine the MQTT topic by the incoming message key, expected to be a string.

    4. Leveraging Kafka Message Topic:

      Set the MQTT topic using the incoming Kafka message’s original topic.

    hashtag
    Kafka payload support

    The sink publishes each Kafka record to MQTT according to the type of the record’s value:

    Payload type
    Action taken by the sink

    Binary (byte[])

    Forwarded unchanged (“pass-through”).

    String

    Published as its UTF-8 byte sequence.

    Connect Struct produced by Avro, Protobuf, or JSON Schema converters

    Converted to JSON, then published as the JSON string’s UTF-8 bytes.

    In short, non-binary objects are first turned into a JSON string; everything that reaches MQTT is ultimately a sequence of bytes.

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    connect.mqtt.hosts

    Contains the MQTT connection end points.

    string

    connect.mqtt.username

    Contains the Mqtt connection user name

    hashtag
    Migration

    Version 9 introduces two breaking changes that affect how you route data and project fields in KCQL. Review the sections below and update your configurations before restarting the connector.

    WITHTARGET is no longer used

    Before (v < 9.0.0)

    After (v ≥ 9.0.0)

    INSERT INTO SELECT * FROM control.boxes.test WITHTARGET ${path}

    INSERT INTO ${path} SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')

    Migration step

    1. Delete every WITHTARGET … clause.

    2. Move the placeholder (or literal) that held the target path into the INSERT INTO expression.

    3. Add mqtt.target.from.field=true to the KCQL PROPERTIES list.

    hashtag
    KCQL field projections are ignored

    Before (v < 9.0.0)

    After (v ≥ 9.0.0)

    Handled directly in the KCQL SELECT list: SELECT id, temp AS temperature …

    The connector passes the full record. Any projection, renaming, or value transformation must be done with a Kafka Connect Single Message Transformer (SMT), KStreams, or another preprocessing step.

    Migration step

    • Remove field lists and aliases from KCQL.

    • Attach an SMT such as org.apache.kafka.connect.transforms.ExtractField$Value, org.apache.kafka.connect.transforms.MaskField$Value, or your own custom SMT to perform the same logic.

    io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
    name=mqtt
    connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
    tasks.max=1
    topics=orders
    connect.mqtt.hosts=tcp://mqtt:1883
    connect.mqtt.clean=true
    connect.mqtt.timeout=1000
    connect.mqtt.keep.alive=1000
    connect.mqtt.service.quality=1
    connect.mqtt.client.id=dm_sink_id
    connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders
    INSERT
    INTO <mqtt-topic>
    SELECT * //no field projection supported
    FROM <kafka-topic>
    //no WHERE clause supported
    -- Insert into /landoop/demo all fields from kafka_topicA
    INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA
    
    -- Insert into /landoop/demo all fields from dynamic field
    INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')

    java.util.Map or other Java Collection

    Serialized to JSON and published as the JSON string’s UTF-8 bytes.

    string

    connect.mqtt.password

    Contains the Mqtt connection password

    password

    connect.mqtt.service.quality

    Specifies the Mqtt quality of service

    int

    connect.mqtt.timeout

    Provides the time interval to establish the mqtt connection

    int

    3000

    connect.mqtt.clean

    connect.mqtt.clean

    boolean

    true

    connect.mqtt.keep.alive

    The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.

    int

    5000

    connect.mqtt.client.id

    Contains the Mqtt session client id

    string

    connect.mqtt.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.mqtt.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.mqtt.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.mqtt.retained.messages

    Specifies the Mqtt retained flag.

    boolean

    false

    connect.mqtt.converter.throw.on.error

    If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

    boolean

    false

    connect.converter.avro.schemas

    If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter

    string

    connect.mqtt.kcql

    Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics

    string

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.mqtt.ssl.ca.cert

    Provides the path to the CA certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.cert

    Provides the path to the certificate file to use with the Mqtt connection

    string

    connect.mqtt.ssl.key

    Certificate private [config] key file path.

    string

    DataStax Java driver documentationarrow-up-right
    INSERT INTO `_key` 
    SELECT ...
    INSERT INTO `_topic` 
    SELECT ...
    connect.cassandra.driver.*
    connect.cassandra.driver.basic.request.consistency=ALL

    Azure Event Hubs

    This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.

    Coming soon!

    Sinks

    This page details the configuration options for the Stream Reactor Kafka Connect sink connectors.

    Sink connectors read data from Kafka and write to an external system.


    hashtag
    FAQ

    hashtag
    Can the datalakes sinks lose data?

    Kafka topic retention policies determine how long a message is retained in a topic before it is deleted. If the retention period expires and the connector has not processed the messages, possibly due to not running or other issues, the unprocessed Kafka data will be deleted as per the retention policy. This can lead to significant data loss since the messages will no longer be available for the connector to sink to the target system.

    hashtag
    Do the datalake sinks support exactly once semantics?

    Yes, the datalakes connectors natively support exactly-once guarantees.

    hashtag
    How do I escape dots in field names in KCQL?

    Field names in Kafka message headers or values may contain dots (.). To access these correctly, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

    hashtag
    How do I escape other special characters in field names in KCQL?

    For field names with spaces or special characters, use a similar escaping strategy:

    • Field name with a space: `_value.'full name'`

    • Field name with special characters: `_value.'$special_characters!'`

    This ensures the connector correctly extracts the intended fields and avoids parsing errors.

    GCP Big Query

    Sink data from Kafka to Big Query

    AWS S3

    Sink data from Kafka to AWS S3 including backing up topics and offsets.

    Azure CosmosDB

    Sink data from Kafka to Azure CosmosDB.

    Azure Data Lake Gen 2

    Sink data from Kafka to Azure Data Lake Gen 2

    Azure Event Hubs

    Load data from Azure Event Hubs into Kafka topics.

    Azure Service Bus

    Sink data from Kafka to Azure Service Bus topics and queues.

    Cassandra

    Sink data from Kafka to Cassandra.

    Elasticsearch

    Sink data from Kafka to Elasticsearch.

    GCP Storage

    Sink data from Kafka to GCP Storage.

    HTTP Sink

    Sink data from Kafka to a HTTP endpoint.

    InfluxDB

    Sink data from Kafka to InfluxDB.

    JMS

    Sink data from Kafka to JMS.

    MongoDB

    Sink data from Kafka to MongoDB.

    MQTT

    Sink data from Kafka to MQTT.

    Redis

    Sink data from Kafka to Redis.

    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    Cover
    INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA

    Azure Service Bus

    This page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.

    Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Bus in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.

    hashtag
    Connector Class

    io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector

    hashtag
    Full Config Example

    circle-check

    For more examples see the .

    The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in . Feel free to tweak the configuration to your requirements.

    hashtag
    KCQL support

    circle-info

    You can specify multiple KCQL statements separated by ; to have the connector map between multiple topics.

    The following KCQL is supported:

    It allows you to map Kafka topic of name <your-kafka-topic> to Service Bus of name <your-service-bus> using the PROPERTIES specified (please check for more info on necessary properties)

    The selection of fields from the Service Bus message is not supported.

    hashtag
    Authentication

    You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.

    Learn more about different methods of connecting to Service Bus on the .

    hashtag
    QUEUE and TOPIC Mappings

    The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check to learn more about those mechanisms.

    hashtag
    Writing to QUEUE ServiceBus

    In order to be writing to the queue there's an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we're interested in QUEUE here and we need to pass it.

    This is sufficient to enable you to create the mapping with your queue.

    hashtag
    Writing to TOPIC ServiceBus

    In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part:

    1. Parameter servicebus.type which can take one of two values depending on the type of the service bus: QUEUE or TOPIC. For topic we're interested in TOPIC here and we need to pass it.

    This is sufficient to enable you to create the mapping with your topic.

    hashtag
    Disabling batching

    If the Connector is supposed to transfer big messages (size of one megabyte and more), Service Bus may not want to accept a batch of such payloads, failing the Connector Task. In order to remediate that you may want to use batch.enabled parameter, setting it to false. This will sacrifice the ability to send the messages in batch (possibly doing it slower) but should enable user to transfer them safely.

    For most of the usages, we recommend omitting it (it's set to true by default).

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • String Schema Key and Binary payload (then MessageId in Service Bus is set with Kafka Key)

    • any other key (or keyless) and Binary payload (this causes Service Bus messages to not have specified MessageId)

    • No Schema and JSON

    hashtag
    Null Payload Transfer

    circle-exclamation

    Azure Service Bus doesn't allow to send messages with null content (payload)

    Null Payload (sometimes referred as Kafka Tombstone) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren't allowed to send messages with null payload and we have to drop them instead.

    Please keep that in mind when using Service Bus and designing business logic around null payloads!

    hashtag
    Option Reference

    hashtag
    KCQL Properties

    Please find below all the necessary KCQL properties:

    Name
    Description
    Type
    Default Value

    hashtag
    Configuration parameters

    Please find below all the relevant configuration parameters:

    Name
    Description
    Type
    Default Value

    InfluxDB

    This page describes the usage of the Stream Reactor InfluxDB Sink Connector.

    circle-info

    hashtag
    This connector has been retired starting version 11.0.0

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Tags

    InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.

    circle-info

    Only applicable to value fields. No support for nested fields, keys or topic metadata.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports .

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    true

    connect.servicebus.sink.retries.max

    Number of retries if message has failed to be delivered to Service Bus

    int

    3

    connect.servicebus.sink.retries.timeout

    Timeout (in milliseconds) between retries if message has failed to be delivered to Service Bus

    int

    500

    servicebus.type

    Specifies Service Bus type: QUEUE or TOPIC

    string

    batch.enabled

    Specifies if the Connector can send messages in batch, see Azure Service Bus

    connect.servicebus.connection.string

    Specifies the Connection String to connect to Service Bus

    string

    connect.servicebus.kcql

    Comma-separated output KCQL queries

    tutorials
    Azure Websitearrow-up-right
    Azure Service Bus documentationarrow-up-right
    Option Reference
    QUEUE and TOPIC Mappings

    boolean

    string

    connect.influx.username

    The user to connect to the influx database

    string

    connect.influx.password

    The password for the influxdb user.

    password

    connect.influx.kcql

    KCQL expression describing field selection and target measurements.

    string

    connect.progress.enabled

    Enables the output for how many records have been processed by the connector

    boolean

    false

    connect.influx.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.influx.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.influx.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.influx.retention.policy

    Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen from 1.0 onwards or default for any previous version

    string

    autogen

    connect.influx.consistency.level

    Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL.

    string

    ALL

    connect.influx.url

    The InfluxDB database url.

    string

    connect.influx.db

    The database to store the values to.

    tutorials
    Error policies

    string

    connector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
    name=AzureEventHubsSinkConnector
    tasks.max=1
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
    connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');
    INSERT INTO <your-service-bus>
    SELECT *
    FROM <your-kafka-topic>
    PROPERTIES(...); 
    connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=
    connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');
    connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');
    io.lenses.streamreactor.connect.influx.InfluxSinkConnector
    name=influxdb
    connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
    tasks.max=1
    topics=influx
    connect.influx.url=http://influxdb:8086
    connect.influx.db=mydb
    connect.influx.username=admin
    connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
    INSERT INTO <your-measure>
    SELECT FIELD, ...
    FROM kafka_topic_name
    [WITHTIMESTAMP FIELD|sys_time]
    [WITHTAG(FIELD|(constant_key=constant_value)]
    -- Insert mode, select all fields from topicA and write to indexA
    INSERT INTO measureA SELECT * FROM topicA
    
    -- Insert mode, select 3 fields and rename from topicB and write to indexB,
    -- use field Y as the point measurement
    INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
    
    -- Insert mode, select 3 fields and rename from topicB and write to indexB,
    -- use field Y as the current system time for Point measurement
    INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
    
    -- Tagging using constants
    INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
    
    -- Tagging using fields in the payload. Say we have a Payment structure
    -- with these fields: amount, from, to, note
    INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
    
    -- Tagging using a combination of fields in the payload and constants.
    -- Say we have a Payment structure with these fields: amount, from, to, note
    INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)

    JMS

    This page describes the usage of the Stream Reactor JMS Sink Connector.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    JMS Topics and Queues

    The sink can write to either topics or queues, specified by the WITHTYPE clause.

    hashtag
    JMS Payload

    When a message is sent to a JMS target it can be one of the following:

    • JSON - Send a TextMessage

    • AVRO - Send a BytesMessage

    • MAP - Send a MapMessage

    • OBJECT - Send an ObjectMessage

    This is set by the WITHFORMAT keyword.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    connect.jms.url

    Provides the JMS broker url

    string

    connect.jms.initial.context.factory

    Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

    io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
    name=jms
    connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
    tasks.max=1
    topics=orders
    connect.jms.url=tcp://activemq:61616
    connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
    connect.jms.connection.factory=ConnectionFactory
    connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON
    INSERT INTO <jms-destination>
    SELECT FIELD, ...
    FROM <your-kafka-topic>
    [WITHFORMAT AVRO|JSON|MAP|OBJECT]
    WITHTYPE TOPIC|QUEUE
    -- Select all fields from topicA and write to jmsA queue
    INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE
    
    -- Select 3 fields and rename from topicB and write
    -- to jmsB topic as JSON in a TextMessage
    INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC

    MongoDB

    This page describes the usage of the Stream Reactor MongoDB Sink Connector.

    hashtag
    Connector Class

    io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Insert Mode

    Insert is the default write mode of the sink.

    hashtag
    Upsert Mode

    The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

    hashtag
    Batching

    The BATCH clause controls the batching of writes to MongoDB.

    hashtag
    TLS/SSL

    TLS/SSL is supported by setting ?ssl=true in the connect.mongo.connection option. The MongoDB driver will then attempt to load the truststore and keystore using the JVM system properties.

    You need to set JVM system properties to ensure that the client is able to validate the SSL certificate presented by the server:

    • javax.net.ssl.trustStore: the path to a trust store containing the certificate of the signing authority

    • javax.net.ssl.trustStorePassword: the password to access this trust store

    • javax.net.ssl.keyStore: the path to a key store containing the client’s SSL certificates

    hashtag
    Authentication Mechanism

    All authentication methods are supported, X.509, LDAP Plain, Kerberos (GSSAPI), MongoDB-CR and SCRAM-SHA-1. The default as of MongoDB version 3.0 SCRAM-SHA-1. To set the authentication mechanism set the authMechanism in the connect.mongo.connection option.

    The mechanism can either be set in the connection string but this requires the password to be in plain text in the connection string or via the connect.mongo.auth.mechanism option.

    If the username is set it overrides the username/password set in the connection string and the connect.mongo.auth.mechanism has precedence.

    e.g.

    hashtag
    JSON Field dates

    List of fields that should be converted to ISO Date on MongoDB insertion (comma-separated field names), for JSON topics only. Field values may be an epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If the string does not parse to ISO, it will be written as a string instead.

    Subdocument fields can be referred to in the following examples:

    • topLevelFieldName

    • topLevelSubDocument.FieldName

    • topLevelParent.subDocument.subDocument2.FieldName

    If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.

    This is controlled via the connect.mongo.json_datetime_fields option.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports .

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    Deprecated Cassandra

    This page describes the usage of the Stream Reactor Cassandra Sink Connector part of the kafka-connect-cassandra-*** artifact.

    circle-exclamation

    The connector converts the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.

    circle-exclamation

    As of Stream-Reactor version 10, the sink will be deprecated. No other changes or fixes will be provided. Follow the Cassandra link to see the new sink implementation details.

    string

    connect.jms.connection.factory

    Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory

    string

    ConnectionFactory

    connect.jms.kcql

    connect.jms.kcql

    string

    connect.jms.subscription.name

    subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics

    string

    connect.jms.password

    Provides the password for the JMS connection

    password

    connect.jms.username

    Provides the user for the JMS connection

    string

    connect.jms.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.jms.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.jms.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.jms.destination.selector

    Selector to use for destination lookup. Either CDI or JNDI.

    string

    CDI

    connect.jms.initial.context.extra.params

    List (comma-separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp

    list

    []

    connect.jms.batch.size

    The number of records to poll for on the target JMS destination in each Connect poll.

    int

    100

    connect.jms.polling.timeout

    Provides the timeout to poll incoming messages

    long

    1000

    connect.jms.source.default.converter

    Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. io.lenses.streamreactor.connect.converters.source.AvroConverter

    string

    connect.jms.converter.throw.on.error

    If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

    boolean

    false

    connect.converter.avro.schemas

    If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE

    string

    connect.jms.headers

    Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"

    string

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.jms.evict.interval.minutes

    Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged.

    int

    10

    connect.jms.evict.threshold.minutes

    The number of minutes after which an uncommitted entry becomes evictable from the connector cache.

    int

    10

    connect.jms.scale.type

    How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max

    javax.net.ssl.keyStorePassword: the password to access this key store

    [TLSv1.2, TLSv1.1, TLSv1]

    ssl.keystore.password

    The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

    password

    ssl.key.password

    The password of the private key in the key store file. This is optional for client.

    password

    ssl.keystore.type

    The file format of the key store file. This is optional for client.

    string

    JKS

    ssl.truststore.location

    The location of the trust store file.

    string

    ssl.endpoint.identification.algorithm

    The endpoint identification algorithm to validate server hostname using server certificate.

    string

    https

    ssl.protocol

    The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.

    string

    TLS

    ssl.trustmanager.algorithm

    The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

    string

    PKIX

    ssl.secure.random.implementation

    The SecureRandom PRNG implementation to use for SSL cryptography operations.

    string

    ssl.truststore.type

    The file format of the trust store file.

    string

    JKS

    ssl.keymanager.algorithm

    The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    string

    SunX509

    ssl.provider

    The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

    string

    ssl.keystore.location

    The location of the key store file. This is optional for client and can be used for two-way authentication for client.

    string

    ssl.truststore.password

    The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

    password

    connect.mongo.connection

    The mongodb connection in the format mongodb://[username:password@]host1[:port1],host2[:port2],…[,hostN[:portN]]][/[database][?options]].

    string

    connect.mongo.db

    The mongodb target database.

    string

    connect.mongo.username

    The username to use when authenticating

    string

    connect.mongo.password

    The password for the use when authenticating

    password

    connect.mongo.auth.mechanism

    String

    string

    SCRAM-SHA-1

    connect.mongo.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.mongo.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.mongo.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.mongo.kcql

    KCQL expression describing field selection and data routing to the target mongo db.

    string

    connect.mongo.json_datetime_fields

    List of fields that should be converted to ISODate on Mongodb insertion (comma-separated field names). For JSON topics only. Field values may be an integral epoch time or an ISO8601 datetime string with an offset (offset or ‘Z’ required). If string does not parse to ISO, it will be written as a string instead. Subdocument fields can be referred to as in the following examples: “topLevelFieldName”, “topLevelSubDocument.FieldName”, “topLevelParent.subDocument.subDocument2.FieldName”, (etc.) If a field is converted to ISODate and that same field is named as a PK, then the PK field is also written as an ISODate.

    list

    []

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    ssl.cipher.suites

    A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.

    list

    ssl.enabled.protocols

    The list of protocols enabled for SSL connections.

    tutorials
    Error policies

    list

    name=mongo
    connector.class=io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector
    tasks.max=1
    topics=orders
    connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders
    connect.mongo.db=connect
    connect.mongo.connection=mongodb://mongo:27017
    INSERT | UPSERT
    INTO <collection_name>
    SELECT FIELD, ...
    FROM <kafka-topic>
    BATCH = 100
    --  Select all fields from topic fx_prices and insert into the fx collection
    INSERT INTO fx SELECT * FROM fx_prices
    
    --  Select all fields from topic fx_prices and upsert into the fx collection,
    --  The assumption is there will be a ticker field in the incoming json:
    UPSERT INTO fx SELECT * FROM fx_prices PK ticker
    # default of scram
    mongodb://host1/?authSource=db1
    # scram explict
    mongodb://host1/?authSource=db1&authMechanism=SCRAM-SHA-1
    # mongo-cr
    mongodb://host1/?authSource=db1&authMechanism=MONGODB-CR
    # x.509
    mongodb://host1/?authSource=db1&authMechanism=MONGODB-X509
    # kerberos
    mongodb://host1/?authSource=db1&authMechanism=GSSAPI
    # ldap
    mongodb://host1/?authSource=db1&authMechanism=PLAIN

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Deletion in Cassandra

    Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for deletion and is compacted/removed from the topic.

    Deletion in Cassandra is supported based on fields in the key of messages with an empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:

    If a message was received with an empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:

    Deletion will only occur if a message with an empty payload is received from Kafka.

    circle-info

    Ensure your ordinal position of the connect.cassandra.delete.struct_flds matches the binding order in the Cassandra delete statement!

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    connect.cassandra.contact.points

    Initial contact point host for Cassandra including port.

    string

    localhost

    connect.cassandra.port

    Cassandra native port.

    io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
    name=cassandra-sink
    connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
    tasks.max=1
    topics=orders
    connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
    connect.cassandra.port=9042
    connect.cassandra.key.space=demo
    connect.cassandra.contact.points=cassandra
    INSERT INTO <your-cassandra-table>
    SELECT FIELD,...
    FROM <your-table>
    [TTL=Time to live]
    -- Insert mode, select all fields from topicA and
    -- write to tableA
    INSERT INTO tableA SELECT * FROM topicA
    
    -- Insert mode, select 3 fields and rename from topicB
    -- and write to tableB
    INSERT INTO tableB SELECT x AS a, y, c FROM topicB
    
    -- Insert mode, select 3 fields and rename from topicB
    -- and write to tableB with TTL
    INSERT INTO tableB SELECT x, y FROM topicB TTL=100000
    DELETE FROM orders WHERE id = ? and product = ?
    # Message
    # "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
    # DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"
    
    # connect.cassandra.delete.enabled=true
    # connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
    # connect.cassandra.delete.struct_flds=id,product

    Redis

    This page describes the usage of the Stream Reactor Redis Sink Connector.

    circle-info

    hashtag
    This connector has been retired starting version 11.0.0

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    hashtag
    Cache mode

    The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:

    You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:

    This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:

    Composite keys are supported with the PK clause, a delimiter can be set with the optional configuration property connect.redis.pk.delimiter.

    hashtag
    Sorted Sets

    To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:

    This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.

    The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.

    hashtag
    Multiple Sorted Sets

    The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store in the sorted-sets. Set KCQL clause to define the filed using PK (primary key)

    circle-info

    Notice we have dropped the INSERT clause.

    The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:

    This will create a key with names FX-USDGBP , FX-EURGBP etc.

    The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.

    hashtag
    Geospatial add

    To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:

    hashtag
    Streams

    To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:

    hashtag
    PubSub

    To insert a message from a Kafka topic to a Redis PubSub use the following KCQL syntax:

    The channel to write to in Redis is determined by a field in the payload of the Kafka message set in the KCQL statement, in this case, a field called myfield.

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports .

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    int

    9042

    connect.cassandra.key.space

    Keyspace to write to.

    string

    connect.cassandra.username

    Username to connect to Cassandra with.

    string

    connect.cassandra.password

    Password for the username to connect to Cassandra with.

    password

    connect.cassandra.ssl.enabled

    Secure Cassandra driver connection via SSL.

    boolean

    false

    connect.cassandra.trust.store.path

    Path to the client Trust Store.

    string

    connect.cassandra.trust.store.password

    Password for the client Trust Store.

    password

    connect.cassandra.trust.store.type

    Type of the Trust Store, defaults to JKS

    string

    JKS

    connect.cassandra.key.store.type

    Type of the Key Store, defauts to JKS

    string

    JKS

    connect.cassandra.ssl.client.cert.auth

    Enable client certification authentication by Cassandra. Requires KeyStore options to be set.

    boolean

    false

    connect.cassandra.key.store.path

    Path to the client Key Store.

    string

    connect.cassandra.key.store.password

    Password for the client Key Store

    password

    connect.cassandra.consistency.level

    Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.

    string

    connect.cassandra.fetch.size

    The number of records the Cassandra driver will return at once.

    int

    5000

    connect.cassandra.load.balancing.policy

    Cassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBIN

    string

    TOKEN_AWARE

    connect.cassandra.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.

    string

    THROW

    connect.cassandra.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.cassandra.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.cassandra.threadpool.size

    The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.

    int

    0

    connect.cassandra.delete.struct_flds

    Fields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.

    list

    []

    connect.cassandra.delete.statement

    Delete statement for cassandra

    string

    connect.cassandra.kcql

    KCQL expression describing field selection and routes.

    string

    connect.cassandra.default.value

    By default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved.

    string

    connect.cassandra.delete.enabled

    Enables row deletion from cassandra

    boolean

    false

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    ssl.protocol

    The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.

    string

    TLS

    ssl.truststore.location

    The location of the trust store file.

    string

    ssl.keystore.password

    The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

    password

    ssl.keystore.location

    The location of the key store file. This is optional for client and can be used for two-way authentication for client.

    string

    ssl.truststore.password

    The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

    password

    ssl.keymanager.algorithm

    The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    string

    SunX509

    ssl.trustmanager.algorithm

    The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

    string

    PKIX

    ssl.keystore.type

    The file format of the key store file. This is optional for client.

    string

    JKS

    ssl.cipher.suites

    A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.

    list

    ssl.endpoint.identification.algorithm

    The endpoint identification algorithm to validate server hostname using server certificate.

    string

    https

    ssl.truststore.type

    The file format of the trust store file.

    string

    JKS

    ssl.enabled.protocols

    The list of protocols enabled for SSL connections.

    list

    [TLSv1.2, TLSv1.1, TLSv1]

    ssl.key.password

    The password of the private key in the key store file. This is optional for client.

    password

    ssl.secure.random.implementation

    The SecureRandom PRNG implementation to use for SSL cryptography operations.

    string

    connect.redis.kcql

    KCQL expression describing field selection and routes.

    string

    connect.redis.host

    Specifies the redis server

    string

    connect.redis.port

    Specifies the redis connection port

    int

    connect.redis.password

    Provides the password for the redis connection.

    password

    connect.redis.ssl.enabled

    Enables ssl for the redis connection

    boolean

    false

    connect.redis.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.redis.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.redis.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    connect.redis.pk.delimiter

    Specifies the redis primary key delimiter

    string

    .

    ssl.provider

    The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

    tutorials
    Error policies

    string

    io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
    name=redis
    connector.class=io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
    tasks.max=1
    topics=redis
    connect.redis.host=redis
    connect.redis.port=6379
    connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM
    [INSERT INTO <redis-cache>]
    SELECT FIELD, ...
    FROM <kafka-topic>
    [PK FIELD]
    [STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]
    { "symbol": "USDGBP" , "price": 0.7943 }
    { "symbol": "EURGBP" , "price": 0.8597 }
    SELECT price from yahoo-fx PK symbol
    Key=EURGBP  Value={ "price": 0.7943 }
    INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60
    SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)
    INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60
    INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADD
    INSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAM
    SELECT * FROM topic STOREAS PubSub (channel=myfield)

    Azure CosmosDB

    This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.

    circle-exclamation

    Version 10.0.0 introduces breaking changes: the connector has been renamed from DocumentDB, uses the official CosmosDB SDK, and supports new bulk and key strategies.

    circle-check

    A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by**;** to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Insert Mode

    Insert is the default write mode of the sink. It inserts messages from Kafka topics into CosmosDB.

    hashtag
    Upsert Mode

    The Sink supports CosmosDB upsert functionality which replaces the existing row if a match is found on the primary keys.

    This mode works with at least once delivery semantics on Kafka as the order is guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.

    If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.

    Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.

    hashtag
    Bulk Mode

    Bulk mode enables efficient batching of writes to CosmosDB, reducing API calls and improving throughput. Enable it with:

    When enabled, you can control batching behavior using the KCQL PROPERTIES clause (flush.size, flush.count, flush.interval). If disabled, records are written individually.

    hashtag
    Custom Key Strategy

    You can control how the connector populates the id field in CosmosDB documents using:

    • connect.cosmosdb.key.source (Key, Metadata, KeyPath, ValuePath)

    • connect.cosmosdb.key.path (field path, used with KeyPath or ValuePath)

    This allows you to use the Kafka record key, metadata, or a specific field from the key or value as the document ID.

    hashtag
    Error Handling

    Configure error handling and retry behavior with:

    • connect.cosmosdb.error.policy (NOOP, THROW, RETRY)

    • connect.cosmosdb.max.retries (max retry attempts)

    • connect.cosmosdb.retry.interval (milliseconds between retries)

    These settings control how the connector responds to errors during writes.

    hashtag
    Throughput

    You can set the manual throughput (RU/s) for new CosmosDB collections with:

    The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.

    hashtag
    Proxy

    If you need to connect via a proxy, specify the proxy details with:

    hashtag
    Progress Reporting

    Enable progress reporting to log how many records have been processed:

    hashtag
    Option Reference

    Name
    Description
    Type
    Default Value

    hashtag
    KCQL Properties

    Note: The following KCQL PROPERTIES options are only available when connect.cosmosdb.bulk.enabled=true. If bulk mode is disabled, records are written individually.

    The CosmosDB Sink Connector supports the following KCQL PROPERTIES options to control file flushing behavior:

    Property
    Description
    Type
    Default Value

    Flush Options Explained:

    • Flush by Count: Triggers a flush after a specified number of records have been written.

    • Flush by Size: Initiates a flush once a predetermined size (in bytes) has been reached.

    • Flush by Interval: Enforces a flush after a defined time interval (in seconds), acting as a fail-safe to ensure timely data management even if other flush conditions are not met.

    You can use these properties in your KCQL statement's PROPERTIES clause, for example:

    hashtag
    Kafka payload support

    This sink supports the following Kafka payloads:

    • Schema.Struct and Struct (Avro)

    • Schema.Struct and JSON

    • No Schema and JSON

    hashtag
    Error policies

    The connector supports .

    hashtag
    Endpoint and Master Key Configuration

    To connect to your Azure CosmosDB instance, the connector requires two essential configuration properties:

    • connect.cosmosdb.endpoint: This specifies the URI of your CosmosDB account. It should point to the connection endpoint provided in the Azure CosmosDB dashboard.

    • connect.cosmosdb.master.key: This is the authentication key used to access the database. Note: Azure often refers to this as the primary key, which can be confusing—the master.key in the connector configuration corresponds to Azure's primary key value.

    Both properties are mandatory and must be set to establish a connection with CosmosDB.

    hashtag
    Key Population Strategy

    The connector offers flexible options for populating the id field of documents in CosmosDB. The behavior is controlled by two configurations:

    connect.cosmosdb.key.source

    • Description: Defines the strategy used to extract or generate the document ID.

    • Valid Values:

      • Key (default): Use the Kafka record key.

    connect.cosmosdb.key.path

    • Description: Specifies the field path to extract the key from, when using KeyPath or ValuePath.

    • Default: id

    • Example: If using ValuePath with connect.cosmosdb.key.path=id, the connector will use value.id as the document ID.

    Elasticsearch

    This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.

    hashtag
    Connector Class

    hashtag
    Elasticsearch 6

    io.lenses.streamreactor.connect.elastic6.ElasticSinkConnector

    hashtag
    Elasticsearch 7

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The following KCQL is supported:

    Examples:

    hashtag
    Kafka Tombstone Handling

    It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:

    • IGNORE (ignores tombstones entirely)

    • FAIL (throws Exception if tombstone happens)

    • DELETE (deletes index with specified id)

    Example:

    hashtag
    Primary Keys

    The PK keyword allows you to specify fields that will be used to generate the key value in Elasticsearch. The values of the selected fields are concatenated and separated by a hyphen (-).

    If no fields are defined, the connector defaults to using the topic name, partition, and message offset to construct the key.

    Field Prefixes

    When defining fields, specific prefixes can be used to determine where the data should be extracted from:

    • _key Prefix Specifies that the value should be extracted from the message key.

      • If a path is provided after _key, it identifies the location within the key where the field value resides.

      • If no path is provided, the entire message key is used as the value.

    hashtag
    Insert and Upsert modes

    INSERT writes new records to Elastic, replacing existing records with the same ID set by the PK (Primary Key) keyword. UPSERT replaces existing records if a matching record is found, nor insert a new one if none is found.

    hashtag
    Document Type

    WITHDOCTYPE allows you to associate a document type to the document inserted.

    hashtag
    Index Suffix

    WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.

    Example:


    hashtag
    Index Names

    hashtag
    Static Index Names

    To use a static index name, define the target index in the KCQL statement without any prefixes:

    This will consistently create an index named index_name for any messages consumed from topicA.

    hashtag
    Extracting Index Names from Headers, Keys, and Values

    hashtag
    Headers

    To extract an index name from a message header, use the _header prefix followed by the header name:

    This statement extracts the value from the gate header field and uses it as the index name.

    For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

    In this case, the value of the header named prefix.abc.suffix is used to form the index name.

    hashtag
    Keys

    To use the full value of the message key as the index name, use the _key prefix:

    For example, if the message key is "freddie", the resulting index name will be freddie.

    hashtag
    Values

    To extract an index name from a field within the message value, use the _value prefix followed by the field name:

    This example uses the value of the name field from the message's value. If the field contains "jason", the index name will be jason.

    Nested Fields in Values

    To access nested fields within a value, specify the full path using dot notation:

    If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.

    Fields with Dots in Their Names

    For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

    If the value structure contains:

    The extracted index name will be hans.

    hashtag
    Auto Index Creation

    The Sink will automatically create missing indexes at startup.

    Please note that this feature is not compatible with index names extracted from message headers/keys/values.

    hashtag
    Options Reference

    Name
    Description
    Type
    Default Value

    hashtag
    KCQL Properties

    Name
    Description
    Type
    Default Value

    hashtag
    SSL Configuration Properties

    hashtag
    SSL Configuration

    Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.

    While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:

    • Data encryption: Prevents unauthorized access to data being transferred.

    • Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.

    • Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).

    hashtag
    Configuration Example

    hashtag
    Terminology:

    • Truststore: Holds certificates to check if the node’s certificate is valid.

    • Keystore: Contains your client’s private key and certificate to prove your identity to the node.

    • SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.

    connect.cosmosdb.error.threshold (number of errors tolerated before failing)

    connect.cosmosdb.master.key

    The connection master key.

    password

    connect.cosmosdb.proxy

    Specifies the connection proxy details.

    string

    Database & Throughput

    connect.cosmosdb.db

    The Azure CosmosDB target database.

    string

    connect.cosmosdb.db.create

    If set to true it will create the database if it doesn't exist. If this is set to default(false) an exception will be raised.

    boolean

    false

    connect.cosmosdb.collection.throughput

    The manual throughput to provision for new Cosmos DB collections (RU/s). The default is 400 RU/s, which is the minimum allowed by Azure Cosmos DB and is cost-effective for most workloads.

    int

    400

    Key & Partitioning

    connect.cosmosdb.key.source

    The source of the key. There are 4 possible values: Key, Metadata, KeyPath or ValuePath.

    string

    Key

    connect.cosmosdb.key.path

    When used with key.source configurations of KeyPath or ValuePath, this is the path to the field in the object that will be used as the key. Defaults to 'id'.

    string

    id

    Write & Consistency

    connect.cosmosdb.consistency.level

    Determines the write visibility. There are four possible values: Strong, BoundedStaleness, Session or Eventual.

    string

    Session

    connect.cosmosdb.bulk.enabled

    Enable bulk mode to reduce chatter.

    boolean

    false

    connect.cosmosdb.kcql

    KCQL expression describing field selection and data routing to the target CosmosDb.

    string

    Error Handling & Retries

    connect.cosmosdb.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. Options: NOOP (swallow error), THROW (propagate error), RETRY (retry message). The number of retries is based on the error.

    string

    THROW

    connect.cosmosdb.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.cosmosdb.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.cosmosdb.error.threshold

    The number of errors to tolerate before failing the sink.

    int

    5

    Queue & Performance

    connect.cosmosdb.flush.count.enable

    Flush on count can be disabled by setting this property to 'false'.

    boolean

    true

    connect.cosmosdb.upload.sync.period

    The time in milliseconds to wait before sending the request.

    int

    100

    connect.cosmosdb.executor.threads

    The number of threads to use for processing the records.

    int

    1

    connect.cosmosdb.max.queue.size

    The maximum number of records to queue per topic before blocking. If the queue limit is reached the connector will throw RetriableException and the connector settings to handle retries will be used.

    int

    1000000

    connect.cosmosdb.max.queue.offer.timeout.ms

    The maximum time in milliseconds to wait for the queue to accept a record. If the queue does not accept the record within this time, the connector will throw RetriableException and the connector settings to handle retries will be used.

    int

    120000

    Monitoring & Progress

    connect.progress.enabled

    Enables the output for how many records have been processed.

    boolean

    false

    (connector default)

    flush.interval

    Specifies the interval (in seconds) for the flush operation.

    Long

    (connector default)

    Metadata: Use Kafka metadata (topic/partition/offset).
  • KeyPath: Extract from a field within the Kafka key.

  • ValuePath: Extract from a field within the Kafka value.

  • Display Name: Key strategy

  • Authentication & Connection

    connect.cosmosdb.endpoint

    The Azure CosmosDB end point.

    flush.size

    Specifies the size (in bytes) for the flush operation.

    Long

    (connector default)

    flush.count

    Specifies the number of records for the flush operation.

    tutorials
    Error policies

    string

    Int

  • _value Prefix Specifies that the value should be extracted from the message value.

    • The remainder of the path identifies the specific location within the message value to extract the field.

  • _header Prefix Specifies that the value should be extracted from the message header.

    • The remainder of the path indicates the name of the header to be used for the field value.

  • localhost

    connect.elastic.port

    Port on which Elastic Search node listens on

    string

    9300

    connect.elastic.tableprefix

    Table prefix (optional)

    string

    connect.elastic.cluster.name

    Name of the elastic search cluster, used in local mode for setting the connection

    string

    elasticsearch

    connect.elastic.write.timeout

    The time to wait in millis. Default is 5 minutes.

    int

    300000

    connect.elastic.batch.size

    How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once

    int

    4000

    connect.elastic.use.http.username

    Username if HTTP Basic Auth required default is null.

    string

    connect.elastic.use.http.password

    Password if HTTP Basic Auth required default is null.

    string

    connect.elastic.error.policy

    Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

    string

    THROW

    connect.elastic.max.retries

    The maximum number of times to try the write again.

    int

    20

    connect.elastic.retry.interval

    The time in milliseconds between retries.

    int

    60000

    connect.elastic.kcql

    KCQL expression describing field selection and routes.

    string

    connect.elastic.pk.separator

    Separator used when have more that one field in PK

    string

    -

    connect.progress.enabled

    Enables the output for how many records have been processed

    boolean

    false

    Path to the keystore file containing the client’s private key and certificate chain for client authentication.

    ssl.keystore.password

    Password for the keystore to protect the private key.

    ssl.keystore.type

    Type of the keystore (e.g., JKS, PKCS12). Default is JKS.

    ssl.protocol

    The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.

    ssl.trustmanager.algorithm

    Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    ssl.keymanager.algorithm

    Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.

    connect.elastic.protocol

    URL protocol (http, https)

    string

    http

    connect.elastic.hosts

    List of hostnames for Elastic Search cluster node, not including protocol or port.

    behavior.on.null.values

    Specifies behavior on Kafka tombstones: IGNORE , DELETE or FAIL

    String

    IGNORE

    Property Name

    Description

    ssl.truststore.location

    Path to the truststore file containing the trusted CA certificates for verifying broker certificates.

    ssl.truststore.password

    Password for the truststore file to protect its integrity.

    ssl.truststore.type

    Type of the truststore (e.g., JKS, PKCS12). Default is JKS.

    tutorials

    string

    ssl.keystore.location

    io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
    name=cosmosdb
    connector.class=io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
    tasks.max=1
    topics=orders-string
    connect.cosmosdb.kcql=INSERT INTO orders SELECT * FROM orders-string
    connect.cosmosdb.db=dm
    connect.cosmosdb.endpoint=[YOUR_AZURE_ENDPOINT]
    connect.cosmosdb.db.create=true
    connect.cosmosdb.master.key=[YOUR_MASTER_KEY]
    connect.cosmosdb.batch.size=10
    INSERT | UPSERT
    INTO <your-collection>
    SELECT FIELD, ...
    FROM kafka_topic
    [PK FIELDS,...]
    -- Insert mode, select all fields from topicA
    -- and write to tableA
    INSERT INTO collectionA SELECT * FROM topicA
    
    -- UPSERT mode, select 3 fields and
    -- rename from topicB and write to tableB
    -- with primary key as the field id from the topic
    UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id
    connect.cosmosdb.bulk.enabled=true
    connect.cosmosdb.collection.throughput=400
    connect.cosmosdb.proxy=<proxy-uri>
    connect.progress.enabled=true
    INSERT INTO myCollection SELECT * FROM myTopic PROPERTIES('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
    io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
    name=elastic
    connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
    tasks.max=1
    topics=orders
    connect.elastic.protocol=http
    connect.elastic.hosts=elastic
    connect.elastic.port=9200
    connect.elastic.cluster.name=elasticsearch
    connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
    connect.progress.enabled=true
    INSERT | UPSERT
    INTO <elastic_index >
    SELECT FIELD, ...
    FROM kafka_topic
    [PK FIELD,...]
    [WITHDOCTYPE=<your_document_type>]
    [WITHINDEXSUFFIX=<your_suffix>]
    -- Insert mode, select all fields from topicA and write to indexA
    INSERT INTO indexA SELECT * FROM topicA
    
    -- Insert mode, select 3 fields and rename from topicB
    -- and write to indexB
    INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
    
    -- UPSERT
    UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
    INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')
    WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
    INSERT INTO index_name SELECT * FROM topicA
    INSERT INTO _header.gate SELECT * FROM topicA
    INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA
    INSERT INTO _key SELECT * FROM topicA
    INSERT INTO _value.name SELECT * FROM topicA
    INSERT INTO _value.name.firstName SELECT * FROM topicA
    INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
    {
      "customer.name": {
        "first.name": "hans"
      }
    }
    ssl.truststore.location=/path/to/truststore.jks
    ssl.truststore.password=your_truststore_password
    ssl.truststore.type=JKS  # Can also be PKCS12 if applicable
    
    ssl.keystore.location=/path/to/keystore.jks
    ssl.keystore.password=your_keystore_password
    ssl.keystore.type=JKS  # Can also be PKCS12 if applicable
    
    ssl.protocol=TLSv1.2  # Or TLSv1.3 for stronger security
    
    ssl.trustmanager.algorithm=PKIX  # Default algorithm for managing certificates
    ssl.keymanager.algorithm=PKIX  # Default algorithm for managing certificates

    HTTP

    This page describes the usage of the Stream Reactor HTTP Sink Connector.

    A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.

    hashtag
    Features

    • Support for Json/Avro/String/Protobuf messages via Kafka Connect (in conjunction with converters for Schema-Registry based data storage).

    • URL, header and content templating ability give you full control of the HTTP request.

    • Configurable batching of messages, even allowing you to combine them into a single request selecting which data to send with your HTTP request.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    Content Template

    The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.

    hashtag
    Static Templating

    If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:

    hashtag
    Single Message Templating

    When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.

    In your configuration, in the content property of your config, you can define template substitutions like the following example:

    (please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)

    hashtag
    Multiple Message Templating

    To collapse multiple messages into a single HTTP request, you can use the multiple messaging template. This is automatic if the template has a messages tag. See the below example:

    Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.

    Your connector configuration will look like this:

    The final result will be HTTP requests with bodies like this:

    hashtag
    Available Keys

    When using simple and multiple message templating, the following are available:

    Field
    Usage Example

    hashtag
    URL Templating

    URL including protocol (eg. http://lenses.io). Template variables can be used.

    The URL is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.

    hashtag
    Authentication Options

    Currently, the HTTP Sink supports either no authentication, BASIC HTTP authentication and OAuth2 authentication.

    hashtag
    No Authentication (Default)

    By default, no authentication is set. This can be also done by providing a configuration like this:

    hashtag
    BASIC HTTP Authentication

    BASIC auth can be configured by providing a configuration like this:

    hashtag
    OAuth2 Authentication

    OAuth auth can be configured by providing a configuration like this:

    hashtag
    Headers List

    To customise the headers sent with your HTTP request you can supply a Headers List.

    Each header key and value is also a so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.

    Example:

    hashtag
    SSL Configuration

    Enabling SSL connections between Kafka Connect and HTTP Endpoint ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity. Please check out section in order to set it up.

    hashtag
    Batch Configuration

    The connector offers three distinct flush options for data management:

    • Flush by Count - triggers a file flush after a specified number of records have been written to it.

    • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

    • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

    It's worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven't reached their thresholds.

    Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

    The flush options are configured using the batchCount, batchSize and `timeInterval properties. The settings are optional and if not specified the defaults are:

    Field
    Default

    hashtag
    Configuration Examples

    Some configuration examples follow on how to apply this connector to different message types.

    These include converters, which are required to instruct Kafka Connect on how to read the source content.

    hashtag
    Static string template

    In this case the converters are irrelevant as we are not using the message content to populate our message template.

    hashtag
    Dynamic string template

    The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.

    hashtag
    Dynamic string template containing json message fields

    Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.

    hashtag
    Dynamic string template containing whole json message

    The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.

    hashtag
    Dynamic string template containing avro message fields

    Fields from the AVRO message are substituted into the message body in the following example:

    hashtag
    Error/Success Reporter

    Starting from version 8.1 as pilot release we give our customers ability to use functionality called Reporter which (if enabled) writes Success and Error processing reports to specified Kafka topic. Reports don't have key and you can find details about status in the message headers and value.

    In order to enable this functionality we have to enable one (or both if we want full reporting) of the properties below:

    These settings configure the Kafka producer for success and error reports. Full configuration options are available in the and sections. Three examples follow:

    hashtag
    Plain Error Reporting

    This is the most common scenario for on-premises Kafka Clusters used just for monitoring

    hashtag
    Error Reporting using SASL

    Using SASL provides a secure and standardized method for authenticating connections to an external Kafka cluster. It is especially valuable when connecting to clusters that require secure communication, as it supports mechanisms like SCRAM, GSSAPI (Kerberos), and OAuth, ensuring that only authorized clients can access the cluster. Additionally, SASL can help safeguard credentials during transmission, reducing the risk of unauthorized access.

    hashtag
    Error Reporting using SSL

    Using SSL ensures secure communication between clients and the Kafka cluster by encrypting data in transit. This prevents unauthorized parties from intercepting or tampering with sensitive information. SSL also supports mutual authentication, allowing both the client and server to verify each other’s identities, which enhances trust and security in the connection.

    hashtag
    Options

    hashtag
    Configuration parameters

    This sink connector supports the following options as part of its configuration:

    Field
    Type
    Required
    Values (Default)

    hashtag
    SSL Configuration Properties

    hashtag
    Error Reporter Properties

    circle-check

    The error reporter can also be configured with SSL Properties. See the section . In this case all properties should be prefixed with connect.reporting.error.config to ensure they apply to the error reporter.

    hashtag
    Success Reporter Properties

    circle-check

    The error reporter can also be configured with SSL Properties. See the section . In this case all properties should be prefixed with connect.reporting.success.config to ensure they apply to the success reporter.

    {{key}}

    {{key.customer.number}}

    Topic

    {{topic}}

    Partition

    {{partition}}

    Offset

    {{offset}}

    Timestamp

    {{timestamp}}

    connect.http.request.content

    String

    Yes

    connect.http.authentication.type

    Authentication

    No

    (none)

    connect.http.request.headers

    List[String]

    No

    connect.http.batch.count

    Int

    No

    The number of records to batch before sending the request, see

    connect.http.batch.size

    Int

    No

    The size of the batch in bytes before sending the request, see

    connect.http.time.interval

    Int

    No

    The time interval in milliseconds to wait before sending the request

    connect.http.upload.sync.period

    Int

    No

    Upload Sync Period (100) - polling time period for uploads in milliseconds

    connect.http.error.threshold

    Int

    No

    The number of errors to tolerate before failing the sink (5)

    connect.http.retry.mode

    String

    No

    The http retry mode. It can be one of : Fixed or Exponential(default)

    connect.http.retries.on.status.codes

    List[String]

    No

    The status codes to retry on (default codes are : 408,429,500,502,5003,504)

    connect.http.retries.max.retries

    Int

    No

    The maximum number of retries to attempt (default is 5)

    connect.http.retry.fixed.interval.ms

    Int

    No

    The set duration to wait before retrying HTTP requests. The default is 10000 (10 seconds)

    connect.http.retries.max.timeout.ms

    Int

    No

    The maximum time in milliseconds to retry a request when Exponential retry is set. Backoff is used to increase the time between retries, up to this maximum (30000)

    connect.http.connection.timeout.ms

    Int

    No

    The HTTP connection timeout in milliseconds (10000)

    connect.http.max.queue.size

    int

    int

    For each processed topic, the connector maintains an internal queue. This value specifies the maximum number of entries allowed in the queue before the enqueue operation blocks. The default is 100,000.

    connect.http.max.queue.offer.timeout.ms

    int

    int

    The maximum time window, specified in milliseconds, to wait for the internal queue to accept new records. The d

    Path to the keystore file containing the client’s private key and certificate chain for client authentication.

    ssl.keystore.password

    Password for the keystore to protect the private key.

    ssl.keystore.type

    Type of the keystore (e.g., JKS, PKCS12). Default is JKS.

    ssl.protocol

    The SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLSv1.3.

    ssl.keymanager.algorithm

    Algorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    ssl.trustmanager.algorithm

    Algorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

    SASL Mechanism used when connecting.

    connect.reporting.error.config.sasl.jaas.config

    JAAS login context parameters for SASL connections in the format used by JAAS configuration files.

    connect.reporting.error.config.sasl.mechanism

    SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.

    SASL Mechanism used when connecting.

    connect.reporting.success.config.sasl.jaas.config

    JAAS login context parameters for SASL connections in the format used by JAAS configuration files.

    connect.reporting.success.config.sasl.mechanism

    SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.

    io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
    name=lenseshttp
    connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
    tasks.max=1
    topics=topicToRead
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    connect.http.authentication.type=none
    connect.http.method=POST
    connect.http.endpoint=http://endpoint.local/receive
    connect.http.request.content="My Static Content Template"
    connect.http.batch.count=1
    connect.http.request.content="My Static Content Template"
    connect.http.request.content="<product><id>{{value.name}}</id></product>"
      <messages>
        {{#message}}
            <message>
              <topic>{{topic}}</topic>
              <employee>{{value.employeeId}}</employee>
              <order>{{value.orderNo}}</order>
              <groupDomain>{{value.groupDomain}}</groupDomain>
            </message>
        {{/message}}
      </messages>
    connect.http.request.content="<messages>{{#message}}<message><topic>{{topic}}</topic><employee>{{value.employeeId}}</employee><order>{{value.orderNo}}</order><groupDomain>{{value.groupDomain}}</groupDomain></message>{{/message}}</messages>"
      <messages>
        <message>
          <topic>myTopic</topic>
           <employee>Abcd1234</employee>
           <order>10</order>
           <groupDomain>myExampleGroup.uk</groupDomain>
        </message>
        <message>
           <topic>myTopic</topic>
           <employee>Efgh5678</employee>
           <order>11</order>
           <groupDomain>myExampleGroup.uk</groupDomain>
        </message>
      </messages>

    Header

    {{header.correlation-id}}

    Value

    {{value}}

    {{value.product.id}}

    connect.http.authentication.type=none
    connect.http.authentication.type=basic
    connect.http.authentication.basic.username=user
    connect.http.authentication.basic.password=password
    connect.http.authentication.type=oauth2
    connect.http.authentication.oauth2.token.url=http://myoauth2.local/getToken
    connect.http.authentication.oauth2.client.id=clientId
    connect.http.authentication.oauth2.client.secret=client-secret
    connect.http.authentication.oauth2.token.property=access_token
    connect.http.authentication.oauth2.client.scope=any
    connect.http.authentication.oauth2.client.headers=header:value
    connect.http.request.headers="Content-Type","text/plain","X-User","{{header.kafkauser}}","Product","{{value.product.id}}"

    batchCount

    50_000 records

    batchSize

    500000000 (500MB)

    timeInterval

    3_600 seconds (1 hour)

    connect.http.batch.count=50000
    connect.http.batch.size=500000000
    connect.http.time.interval=3600
    connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
    topics=mytopic
    tasks.max=1
    connect.http.method=POST
    connect.http.endpoint="https://my-endpoint.example.com"
    connect.http.request.content="My Static Content Template"
    connect.http.batch.count=1
    connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
    topics=mytopic
    tasks.max=1
    connect.http.method=POST
    connect.http.endpoint="https://my-endpoint.example.com"
    connect.http.request.content="{{value}}"
    connect.http.batch.count=1
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
    topics=mytopic
    tasks.max=1
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    connect.http.method=POST
    connect.http.endpoint="https://my-endpoint.example.com"
    connect.http.request.content="product: {{value.product}}"
    connect.http.batch.size=1
    value.converter.schemas.enable=false
    connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
    topics=mytopic
    tasks.max=1
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    connect.http.method=POST
    connect.http.endpoint="https://my-endpoint.example.com"
    connect.http.request.content="whole product message: {{value}}"
    connect.http.time.interval=5
    connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
    topics=mytopic
    tasks.max=1
    connect.http.method=POST
    connect.http.endpoint="https://my-endpoint.example.com"
    connect.http.request.content="product: {{value.product}}"
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schemas.enable=true
    value.converter.schema.registry.url=http://schema-registry:8081
    connect.reporting.error.config.enabled=true
    connect.reporting.success.config.enabled=true
    connect.reporting.error.config.enabled=true
    connect.reporting.error.config.bootstrap.servers=localhost:9094
    connect.reporting.error.config.topic=http-monitoring
    connect.reporting.error.config.enabled=true
    connect.reporting.error.config.bootstrap.servers=my-kafka-cluster.com:9093
    connect.reporting.error.config.security.protocol=SASL_SSL
    connect.reporting.error.config.sasl.mechanism=PLAIN
    connect.reporting.error.config.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="MYUSER" password="MYPASSWORD";
    connect.reporting.error.config.enabled=true
    connect.reporting.error.config.bootstrap.servers=SSL://my-ssl-protected-cluster:9094
    connect.reporting.error.config.security.protocol=SSL
    connect.reporting.error.config.ssl.keystore.location=/path/to/my/keystore.p12
    connect.reporting.error.config.ssl.keystore.type=PKCS12
    connect.reporting.error.config.ssl.truststore.location=/path/to/my/truststore.p12
    connect.reporting.error.config.ssl.truststore.password=************
    connect.reporting.error.config.ssl.truststore.type=PKCS12
    connect.reporting.error.config.topic=http-error-topic

    connect.http.method

    HttpMethod

    Yes

    POST, PUT, PATCH

    connect.http.endpoint

    String

    Property Name

    Description

    ssl.truststore.location

    Path to the truststore file containing the trusted CA certificates for verifying broker certificates.

    ssl.truststore.password

    Password for the truststore file to protect its integrity.

    ssl.truststore.type

    Type of the truststore (e.g., JKS, PKCS12). Default is JKS.

    Property Name

    Description

    connect.reporting.error.config.enabled

    Specifies whether the reporter is enabled. false by default.

    connect.reporting.error.config.bootstrap.servers

    A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled.

    connect.reporting.error.config.topic

    Specifies the topic for Reporter to write to.

    Property Name

    Description

    connect.reporting.success.config.enabled

    Specifies whether the reporter is enabled. false by default.

    connect.reporting.success.config.bootstrap.servers

    A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required if reporter is enabled.

    connect.reporting.success.config.topic

    Specifies the topic for Reporter to write to.

    tutorials
    Content Template
    Content Template
    SSL Configuration Properties
    Success Reporter Properties
    Error Reporter Properties
    local/plain configuration
    SASL configuration
    SSL configuration
    SSL Configuration Properties
    SSL Configuration Properties

    Key

    Yes

    ssl.keystore.location

    connect.reporting.error.config.location

    connect.reporting.success.config.location

    GCP BigQuery

    This page describes the usage of the Stream Reactor GCP Big Query Sink Connector.

    The Google BigQuery sink connector is an open-source connector imported from Confluent (originally developed by WePay) that enables you to export data from Apache Kafka® topics to Google BigQuery tables.

    hashtag
    Overview

    The BigQuery sink connector allows you to:

    • Stream data from Kafka topics to BigQuery tables

    • Automatically create tables based on topic data

    • Configure data delivery semantics (at-least-once or exactly-once)

    • Perform schema evolution when topic schemas change

    hashtag
    Prerequisites

    Before using the BigQuery sink connector, ensure you have:

    1. A Google Cloud Platform (GCP) account

    2. A BigQuery project with appropriate permissions

    3. Service account credentials with access to BigQuery

    4. Kafka topics with data to be exported

    hashtag
    Configuration

    hashtag
    Basic Configuration

    Here's a basic configuration for the BigQuery sink connector:

    hashtag
    Features of Google BigQuery Sink Connector

    • Multiple tasks support: Configure using tasks.max parameter for performance optimization when parsing multiple files

    • InsertAll API features: Supports insert operations with built-in duplicate detection capabilities

    • Real-time streaming: Records are inserted one at a time and available immediately for querying

    hashtag
    Important Configuration Properties

    Property
    Description
    Type
    Default
    Importance

    hashtag
    Data Mapping

    hashtag
    Data Type Conversions

    The connector maps Kafka Connect schema types to BigQuery data types as follows:

    BigQuery Data Type
    Connector Mapping

    hashtag
    Schema Evolution

    When schema evolution is enabled (using allowNewBigQueryFields, allowBigQueryRequiredFieldRelaxation, and allowSchemaUnionization), the connector can handle schema changes:

    • New fields added to the Kafka topic can be added to the BigQuery table

    • Field constraints can be relaxed from REQUIRED to NULLABLE

    • Schemas can be unionized when records in the same batch have different schemas

    hashtag
    Usage Examples

    hashtag
    Basic Example

    hashtag
    Example with Batch Loading

    hashtag
    Example with Upsert Functionality

    hashtag
    Troubleshooting

    hashtag
    Common Issues

    1. Authentication errors: Ensure your service account key file is correct and has appropriate permissions.

    2. Schema compatibility issues: When schema updates are enabled, existing data might not be compatible with new schemas.

    3. Quota limitations: BigQuery has quotas for API requests; consider adjusting threadPoolSize and queueSize.

    hashtag
    Logging

    To enable detailed logging for troubleshooting:

    hashtag
    Limitations

    The BigQuery Sink connector has the following limitations:

    • The connector does not support schemas with recursion.

    • The connector does not support schemas having float fields with NaN or +Infinity values.

    • Auto schema update does not support removing columns.

    • Auto schema update does not support recursive schemas.

    hashtag
    Upgrading to 2.x.x

    The following changes aren’t backward compatible in the BigQuery connector:

    • datasets was removed and defaultDataset has been introduced. The connector now infers the dataset from the topic name if the topic is in the form <dataset>:<tableName>. If the topic name is in the form <tablename>, the connector defaults to defaultDataset.

    • topicsToTables was removed. You should use SMT RegexRouter to route topics to tables.

    URL Template
    Content Template
    Authentication Options
    Headers List
    Batch Configuration
    Batch Configuration

    Multi-topic support: Can stream from multiple topics to corresponding BigQuery tables

  • Parallel processing: Uses an internal thread pool (default: 10 threads, configurable) for scalable record streaming

  • The BigQuery project to write to.

    string

    -

    high

    topics

    A list of Kafka topics to read from.

    list

    -

    high

    autoCreateTables

    Create BigQuery tables if they don't already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input.

    boolean

    false

    high

    gcsBucketName

    The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.

    string

    ""

    high

    queueSize

    The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.

    long

    -1

    high

    bigQueryMessageTimePartitioning

    Whether or not to use the message time when inserting records. Default uses the connector processing time.

    boolean

    false

    high

    bigQueryPartitionDecorator

    Whether or not to append partition decorator to BigQuery table name when inserting records. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.

    boolean

    true

    high

    keySource

    Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE and JSON. This property is available in BigQuery sink connector version 1.3 (and later).

    string

    FILE

    medium

    keyfile

    Keyfile can be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).

    string

    null

    medium

    bigQueryRetry

    The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.

    int

    0

    medium

    bigQueryRetryWait

    The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.

    long

    1000

    medium

    sanitizeTopics

    Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.

    boolean

    false

    medium

    schemaRetriever

    A class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, the SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic.

    class

    com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever

    medium

    threadPoolSize

    The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.

    int

    10

    medium

    autoCreateBucket

    Whether to automatically create the given bucket, if it does not exist.

    boolean

    true

    medium

    allowNewBigQueryFields

    If true, new fields can be added to BigQuery tables during subsequent schema updates.

    boolean

    false

    medium

    allowBigQueryRequiredFieldRelaxation

    If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE. Note that allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation replaced the autoUpdateSchemas parameter of older versions of this connector.

    boolean

    false

    medium

    allowSchemaUnionization

    If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts. Note that setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older versions.

    boolean

    false

    medium

    auto.register.schemas

    Specifies if the Serializer should attempt to register the Schema with Schema Registry.

    boolean

    true

    medium

    use.latest.version

    Only applies when auto.register.schemas is set to false. If use.latest.version is set to true, then Schema Registry uses the latest version of the schema in the subject for serialization.

    boolean

    true

    medium

    timestampPartitionFieldName

    The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave blank to enable ingestion time partitioning for each table.

    string

    null

    low

    clusteringPartitionFieldNames

    Comma-separated list of fields where data is clustered in BigQuery.

    list

    null

    low

    timePartitioningType

    The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type.

    string

    DAY

    low

    allBQFieldsNullable

    If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).

    boolean

    false

    low

    avroDataCacheSize

    The size of the cache to use when converting schemas from Avro to Kafka Connect.

    int

    100

    low

    batchLoadIntervalSec

    The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.

    int

    120

    low

    convertDoubleSpecialValues

    Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery.

    boolean

    false

    low

    enableBatchLoad

    Beta Feature - Use with caution. The sublist of topics to be batch loaded through GCS.

    list

    ""

    low

    includeKafkaData

    Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.

    boolean

    false

    low

    upsertEnabled

    Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won't work with SMTs that change the name of the topic and doesn't support JSON input.

    boolean

    false

    low

    deleteEnabled

    Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic and doesn't support JSON input.

    boolean

    false

    low

    intermediateTableSuffix

    A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table.

    string

    "tmp"

    low

    mergeIntervalMs

    How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing.

    long

    60000

    low

    mergeRecordsThreshold

    How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing.

    long

    -1

    low

    kafkaDataFieldName

    The Kafka data field name. The default value is null, which means the Kafka Data field will not be included.

    string

    null

    low

    kafkaKeyFieldName

    The Kafka key field name. The default value is null, which means the Kafka Key field will not be included.

    string

    null

    low

    topic2TableMap

    Map of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,... Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set.

    string

    ""

    low

    csfle.enabled

    CSFLE is enabled for the connector if set to True.

    boolean

    False

    low

    INT32

    INTEGER

    INT64

    FLOAT

    FLOAT32

    FLOAT

    FLOAT64

    BOOLEAN

    Boolean

    BYTES

    Bytes

    TIMESTAMP

    Logical TIMESTAMP

    TIME

    Logical TIME

    DATE

    Logical DATE

    FLOAT

    Logical Decimal

    DATE

    Debezium Date

    TIME

    Debezium MicroTime

    TIME

    Debezium Time

    TIMESTAMP

    Debezium MicroTimestamp

    TIMESTAMP

    Debezium TIMESTAMP

    TIMESTAMP

    Debezium ZonedTimestamp

    Table creation failures: Ensure autoCreateTables is only used with Schema Registry-based inputs (Avro, Protobuf, or JSON Schema).

  • Performance issues: For high-volume data, consider using batch loading via GCS instead of streaming inserts.

  • When the connector is configured with upsertEnabled or deleteEnabled, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

    • io.debezium.transforms.ByLogicalTableRouter

    • io.debezium.transforms.outbox.EventRouter

    • org.apache.kafka.connect.transforms.RegexRouter

    • org.apache.kafka.connect.transforms.TimestampRouter

    • io.confluent.connect.transforms.MessageTimestampRouter

    • io.confluent.connect.transforms.ExtractTopic$Key

  • autoUpdateSchemas was replaced by allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation.

  • value.converter.enhanced.avro.schema.support should be set to false or removed. If this property is not removed or set to false, you may receive the following error:

  • defaultDataset

    The default dataset to be used. Replaced the datasets parameter of older versions of this connector.

    string

    -

    high

    STRING

    String

    INTEGER

    INT8

    INTEGER

    INT16

    project

    INTEGER

    Invalid field name
    "com.examples.project-super-important.v1.MyData". Fields must
    contain only letters, numbers, and underscores, start with a letter or
    underscore, and be at most 300 characters long.
    name = kcbq-connect1
    connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max = 1
    topics = quickstart
    sanitizeTopics = true
    autoCreateTables = true
    allowNewBigQueryFields = true
    allowBigQueryRequiredFieldRelaxation = true
    schemaRetriever = com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
    project = lenses-123
    defaultDataset = ConfluentDataSet
    keyfile = <path to json file>
    transforms = RegexTransformation
    transforms.RegexTransformation.type = org.apache.kafka.connect.transforms.RegexRouter
    transforms.RegexTransformation.regex = (kcbq_)(.*)
    transforms.RegexTransformation.replacement = $2
    name=bigquery-sink
    connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max=1
    topics=orders,customers
    project=my-gcp-project
    defaultDataset=kafka_data
    keyfile=/path/to/keyfile.json
    autoCreateTables=true
    name=bigquery-sink
    connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max=1
    topics=orders,customers
    project=my-gcp-project
    defaultDataset=kafka_data
    keyfile=/path/to/keyfile.json
    enableBatchLoad=orders,customers
    gcsBucketName=my-gcs-bucket
    autoCreateBucket=true
    name=bigquery-sink
    connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    tasks.max=1
    topics=orders,customers
    project=my-gcp-project
    defaultDataset=kafka_data
    keyfile=/path/to/keyfile.json
    upsertEnabled=true
    mergeIntervalMs=30000
    mergeRecordsThreshold=1000
    log4j.logger.com.wepay.kafka.connect.bigquery=DEBUG

    AWS S3

    This page describes the usage of the Stream Reactor AWS S3 Sink Connector.

    This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to AWS S3 Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.

    hashtag
    Connector Class

    io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector

    hashtag
    Example

    circle-check

    For more examples see the .

    This example writes to a bucket called demo, partitioning by a field called ts, store as JSON.

    hashtag
    KCQL Support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The connector uses KCQL to map topics to S3 buckets and paths. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .:

    In this case, you can use the following KCQL statement:

    hashtag
    Target Bucket and Path

    The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

    Here are a few examples:

    hashtag
    SQL Projection

    Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

    hashtag
    Source Topic

    To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:

    In this case the topic name will be appended to the $target destination.

    hashtag
    Partitioning & Object Keys

    The object key serves as the filename used to store data in S3. There are two options for configuring the object key:

    • Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $bucket/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.

    • Custom: The object key is driven by the PARTITIONBY clause. The format is either $bucket/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (AWS Athena naming style mimicking Hive-like data partitioning) or $bucket/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.

    Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of AWS S3 key length restrictions.

    circle-exclamation

    The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.

    The partition clause works on header, key and values fields of the Kafka message.

    To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:

    However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

    You can also use the entire message key as long as it can be coerced into a primitive type:

    In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the S3 object key name:

    Kafka message headers can also be used in the S3 object key definition, provided the header values are of primitive types easily convertible to strings:

    Customizing the object key can leverage various components of the Kafka message. For example:

    This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure S3 object keys effectively.

    To enable Athena-like partitioning, use the following syntax:

    hashtag
    Rolling Windows

    Storing data in Amazon S3 and partitioning it by time is a common practice in data management. For instance, you may want to organize your S3 data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

    To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process.

    Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:

    In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

    The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

    hashtag
    Data Storage Format

    While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within AWS S3. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.

    Supported storage formats encompass:

    • AVRO

    • Parquet

    • JSON

    • CSV (including headers)

    Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in S3. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.

    By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

    circle-exclamation

    Not supported with a custom partition strategy.

    Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in S3 is desired.

    hashtag
    Examples

    Storing the message Value Avro data as Parquet in S3:

    The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in S3.

    Enabling the full message stored as JSON in S3:

    Enabling the full message stored as AVRO in S3:

    If the restore (see the S3 Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

    hashtag
    Flush Options

    The connector offers three distinct flush options for data management:

    • Flush by Count - triggers a file flush after a specified number of records have been written to it.

    • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

    • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

    It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

    Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

    The flush options are configured using the flush.count, flush.size, and flush.interval properties. The settings are optional and if not specified the defaults are:

    • flush.count = 50_000

    • flush.size = 500000000 (500MB)

    • flush.interval = 3_600 (1 hour)

    circle-check

    A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.

    When connect.s3.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:

    hashtag
    Flushing By Interval

    The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to S3). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.

    hashtag
    Properties

    The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

    Name
    Description
    Type
    Available Values
    Default Value

    The sink connector optimizes performance by padding the output objects. This proves beneficial when using the S3 Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the S3 Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.

    hashtag
    Compression

    AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.

    Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:

    Compression
    Avro Support
    Avro (requires Level)
    Parquet Support
    JSON

    Please note that not all compression libraries are bundled with the S3 connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

    hashtag
    Authentication

    The connector offers two distinct authentication modes:

    • Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.

    • Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.

    Here’s an example configuration for the Credentials mode:

    For enhanced security and flexibility when using the Credentials mode, it is highly advisable to utilize Connect Secret Providers.

    hashtag
    Error policies

    The connector supports .

    hashtag
    API Compatible systems

    The connector can also be used against API compatible systems provided they implement the following:

    hashtag
    Indexes Directory

    The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.

    By default, the index objects are grouped within a prefix named .indexes for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this .indexes prefix.

    You can configure the prefix for these index objects using the property connect.s3.indexes.name. This property specifies the path from the root of the S3 bucket. Note that even if you configure this property, the connector will still place the indexes within a nested prefix of the specified prefix.

    hashtag
    Examples

    hashtag
    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value

    Azure Data Lake Gen2

    This page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.

    This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to Azure Data Lake Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.

    hashtag
    Connector Class

    hashtag

    Text

  • BYTES

  • Defines the character used for padding.

    Char

    ‘0’

    padding.length.partition

    Sets the padding length for the partition.

    Int

    0

    padding.length.offset

    Sets the padding length for the offset.

    Int

    12

    partition.include.keys

    Specifies whether partition keys are included.

    Boolean

    false Default (Custom Partitioning): true

    store.envelope

    Indicates whether to store the entire Kafka message

    Boolean

    store.envelope.fields.key

    Indicates whether to store the envelope’s key.

    Boolean

    store.envelope.fields.headers

    Indicates whether to store the envelope’s headers.

    Boolean

    store.envelope.fields.value

    Indicates whether to store the envelope’s value.

    Boolean

    store.envelope.fields.metadata

    Indicates whether to store the envelope’s metadata.

    Boolean

    flush.size

    Specifies the size (in bytes) for the flush operation.

    Long

    500000000 (500MB)

    flush.count

    Specifies the number of records for the flush operation.

    Int

    50000

    flush.interval

    Specifies the interval (in seconds) for the flush operation

    Long

    3600(1h)

    key.suffix

    When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added

    String

    <empty>

    ✅

    ✅

    GZIP

    ✅

    ✅

    LZ0

    ✅

    LZ4

    ✅

    BROTLI

    ✅

    BZIP2

    ✅

    ZSTD

    ✅

    ⚙️

    ✅

    DEFLATE

    ✅

    ⚙️

    XZ

    ✅

    ⚙️

    indexes/s3-connector-logs/<connector_name>/

    Uses a custom subdirectory s3-connector-logs within indexes, with a subdirectory for each connector.

    logs/indexes

    logs/indexes/<connector_name>/

    Indexes are stored under logs/indexes, with a subdirectory for each connector.

    The AWS Access Key used for authentication.

    string

    (Empty)

    connect.s3.aws.secret.key

    The AWS Secret Key used for authentication.

    string

    (Empty)

    connect.s3.aws.region

    The AWS Region where the S3 bucket is located.

    string

    (Empty)

    connect.s3.pool.max.connections

    Specifies the maximum number of connections allowed in the AWS Client’s HTTP connection pool when interacting with S3.

    int

    -1 (undefined)

    50

    connect.s3.custom.endpoint

    Allows for the specification of a custom S3 endpoint URL if needed.

    string

    (Empty)

    [Deprecated]

    connect.s3.vhost.bucket

    Enables the use of Vhost Buckets for S3 connections. Always set to true when custom endpoints are used. Deprecation: This setting maps directly to the AWS SDK’s pathStyleAccessEnabled() method. However, these two concepts are semantically opposite. Use available since version 11.3.0. connect.s3.path.style.access

    boolean

    true, false

    false

    connect.s3.path.style.access

    When set to

    true it enables path-style access (matches AWS SDK semantics). When set to

    false it enables virtual-hosted style access.

    boolena

    true, false

    connect.s3.error.policy

    connect.s3.path.style.accessDefines the error handling policy when errors occur during data transfer to or from S3.

    string

    “NOOP,” “THROW,” “RETRY”

    “THROW”

    connect.s3.max.retries

    Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.

    int

    20

    connect.s3.retry.interval

    Specifies the interval (in milliseconds) between retry attempts by the connector.

    int

    60000

    connect.s3.http.max.retries

    Sets the maximum number of retries for the underlying HTTP client when interacting with S3.

    long

    5

    connect.s3.http.retry.interval

    Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.

    long

    50

    connect.s3.local.tmp.directory

    Enables the use of a local folder as a staging area for data transfer operations.

    string

    (Empty)

    connect.s3.kcql

    A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.

    string

    (Empty)

    connect.s3.compression.codec

    Sets the Parquet compression codec to be used when writing data to S3.

    string

    “UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”

    “UNCOMPRESSED”

    connect.s3.compression.level

    Sets the compression level when compression is enabled for data transfer to S3.

    int

    1-9

    (Empty)

    connect.s3.seek.max.files

    Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.

    int

    5

    connect.s3.indexes.name

    Configure the indexes prefix for this connector.

    string

    ".indexes"

    connect.s3.exactly.once.enable

    By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management

    boolean

    true, false

    true

    connect.s3.schema.change.detector

    Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).

    string

    default, version, compatibility

    default

    connect.s3.skip.null.values

    Skip records with null values (a.k.a. tombstone records).

    boolean

    true, false

    false

    connect.s3.latest.schema.optimization.enabled

    When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.

    boolean

    true,false

    false

    padding.type

    Specifies the type of padding to be applied.

    LeftPad, RightPad, NoOp

    LeftPad, RightPad, NoOp

    LeftPad

    UNCOMPRESSED

    ✅

    ✅

    ✅

    Index Name (connect.s3.indexes.name)

    Resulting Indexes Prefix Structure

    Description

    .indexes (default)

    .indexes/<connector_name>/

    The default setup, where each connector uses its own subdirectory within .indexes.

    custom-indexes

    custom-indexes/<connector_name>/

    Custom root directory custom-indexes, with a subdirectory for each connector.

    connect.s3.aws.auth.mode

    Specifies the AWS authentication mode for connecting to S3.

    string

    “Credentials,” “Default”

    “Default”

    tutorials
    Error policies
    sink commit.png

    padding.char

    SNAPPY

    indexes/s3-connector-logs

    connect.s3.aws.access.key

    connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
    connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.ts STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
    topics=demo
    name=demo
    INSERT INTO bucketAddress[:pathPrefix]
    SELECT *
    FROM kafka-topic
    [[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
    [STOREAS storage_format]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    {
      ...
      "a.b": "value",
      ...
    }
    INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
    INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
    INSERT INTO testbucket SELECT * FROM topicA;
    INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
    topics.regex = ^sensor_data_\d+$
    connect.s3.kcql= INSERT INTO $target SELECT * FROM  `*` ....
    PARTITIONBY fieldA, fieldB
    PARTITIONBY _key
    PARTITIONBY _key.fieldA, _key.fieldB
    PARTITIONBY _header.<header_key1>[, _header.<header_key2>]
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    INSERT INTO $bucket[:$prefix]
    SELECT * FROM $topic
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    STOREAS `AVRO`
    PROPERTIES (
        'partition.include.keys'=true,
    )
    connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
    connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    {
      "key": <the message Key, which can be a primitive or a complex object>,
      "value": <the message Key, which can be a primitive or a complex object>,
      "headers": {
        "header1": "value1",
        "header2": "value2"
      },
      "metadata": {
        "offset": 0,
        "partition": 0,
        "timestamp": 0,
        "topic": "topic"
      }
    }
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    ...
    pgsqlCopyEditmessage1 -> schema1  
    message2 -> schema1  
      (No flush needed – same schema)
    
    message3 -> schema2  
      (Flush occurs – new schema introduced)
    
    message4 -> schema2  
      (No flush needed – same schema)
    
    message5 -> schema1  
      Without optimization: would trigger a flush  
      With optimization: no flush – schema1 is backward-compatible with schema2
    
    message6 -> schema2  
    message7 -> schema2  
      (No flush needed – same schema, it would happen based on the flush thresholds)
    ...
    connect.s3.aws.auth.mode=Credentials
    connect.s3.aws.region=eu-west-2
    connect.s3.aws.access.key=$AWS_ACCESS_KEY
    connect.s3.aws.secret.key=$AWS_SECRET_KEY
    ...
    listObjectsV2
    listObjectsV2Pagbinator
    putObject
    getObject
    headObject
    deleteObjects
    deleteObject
    Example
    circle-check

    For more examples see the tutorials.

    hashtag
    KCQL Support

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    The connector uses KCQL to map topics to Datalake buckets and paths. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as JSON can use fields containing .:

    In this case, you can use the following KCQL statement:

    hashtag
    Target Bucket and Path

    The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

    Here are a few examples:

    hashtag
    SQL Projection

    Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

    hashtag
    Source Topic

    To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:

    In this case the topic name will be appended to the $target destination.

    hashtag
    KCQL Properties

    The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

    Name
    Description
    Type
    Available Values
    Default Value

    padding.type

    Specifies the type of padding to be applied.

    LeftPad, RightPad, NoOp

    LeftPad, RightPad, NoOp

    LeftPad

    The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the Datalake Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the Datalake Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.

    hashtag
    Partitioning & File names

    The object key serves as the filename used to store data in Datalake. There are two options for configuring the object key:

    • Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.

    • Custom: The object key is driven by the PARTITIONBY clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.

    circle-exclamation

    The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.

    The partition clause works on Header, Key and Values fields of the Kafka message.

    Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of Azure Datalake key length restrictions.

    To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:

    However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

    You can also use the entire message key as long as it can be coerced into a primitive type:

    In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the Datalake object key name:

    Kafka message headers can also be used in the Datalake object key definition, provided the header values are of primitive types easily convertible to strings:

    Customizing the object key can leverage various components of the Kafka message. For example:

    This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure Datalake object keys effectively.

    To enable Athena-like partitioning, use the following syn

    hashtag
    Rolling Windows

    Storing data in Azure Datalake and partitioning it by time is a common practice in data management. For instance, you may want to organize your Datalake data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

    To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation here.

    Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:

    In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

    The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

    hashtag
    Data Storage Format

    While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within Azure Datalake. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.

    Supported storage formats encompass:

    • AVRO

    • Parquet

    • JSON

    • CSV (including headers)

    • Text

    • BYTES

    Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in Datalake. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.

    By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

    circle-exclamation

    Not supported with a custom partition strategy.

    Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in Datalake is desired.

    hashtag
    Examples

    Storing the message Value Avro data as Parquet in Datalake:

    The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in Datalake.

    Enabling the full message stored as JSON in Datalake:

    Enabling the full message stored as AVRO in Datalake:

    If the restore (see the Datalake Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

    hashtag
    Flush Options

    The connector offers three distinct flush options for data management:

    • Flush by Count - triggers a file flush after a specified number of records have been written to it.

    • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.

    • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

    It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

    Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

    The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see KCQL Properties section). The settings are optional and if not specified the defaults are:

    • flush.count = 50_000

    • flush.size = 500000000 (500MB)

    • flush.interval = 3600 (1 hour)

    circle-check

    A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.

    When connect.datalake.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:

    hashtag
    Flushing By Interval

    The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to Data Lake). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.

    sink commit.png

    hashtag
    Compression

    AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.

    Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:

    Compression
    Avro Support
    Avro (requires Level)
    Parquet Support
    JSON

    UNCOMPRESSED

    ✅

    ✅

    ✅

    Please note that not all compression libraries are bundled with the Datalake connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

    hashtag
    Authentication

    The connector offers two distinct authentication modes:

    • Default: This mode relies on the default Azure authentication chain, simplifying the authentication process.

    • Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with Azure.

    • Credentials: In this mode, explicit configuration of Azure Access Key and Secret Key is required for authentication.

    When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

    Here’s an example configuration for the “Credentials” mode:

    And here is an example configuration using the “Connection String” mode:

    For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers.

    hashtag
    Error policies

    The connector supports Error policies.

    hashtag
    Indexes Directory

    The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.

    By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes directory.

    You can configure the root directory for these index files using the property connect.datalake.indexes.name. This property specifies the path from the root of the data lake filesystem. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.

    hashtag
    Examples

    Index Name (connect.datalake.indexes.name)

    Resulting Indexes Directory Structure

    Description

    .indexes (default)

    .indexes/<connector_name>/

    The default setup, where each connector uses its own subdirectory within .indexes.

    custom-indexes

    custom-indexes/<connector_name>/

    Custom root directory custom-indexes, with a subdirectory for each connector.

    hashtag
    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value

    connect.datalake.azure.auth.mode

    Specifies the Azure authentication mode for connecting to Datalake.

    string

    “Credentials”, “ConnectionString” or “Default”

    “Default”

    io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
    connector.class=io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
    connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    INSERT INTO bucketAddress[:pathPrefix]
    SELECT *
    FROM kafka-topic
    [[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
    [STOREAS storage_format]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    {
      ...
      "a.b": "value",
      ...
    }
    INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
    INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
    INSERT INTO testcontainer SELECT * FROM topicA;
    INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
    topics.regex = ^sensor_data_\d+$
    connect.datalake.kcql= INSERT INTO $target SELECT * FROM  `*` ....
    PARTITIONBY fieldA, fieldB
    PARTITIONBY _key
    PARTITIONBY _key.fieldA, _key.fieldB
    PARTITIONBY _header.<header_key1>[, _header.<header_key2>]
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    INSERT INTO $container[:$prefix]
    SELECT * FROM $topic
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    STOREAS `AVRO`
    PROPERTIES (
        'partition.include.keys'=true,
    )
    connector.class=io.lenses.streamreactor.connect.azure.datalake.sink.DatalakeSinkConnector
    connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    {
      "key": <the message Key, which can be a primitive or a complex object>,
      "value": <the message Key, which can be a primitive or a complex object>,
      "headers": {
        "header1": "value1",
        "header2": "value2"
      },
      "metadata": {
        "offset": 0,
        "partition": 0,
        "timestamp": 0,
        "topic": "topic"
      }
    }
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
      ...
      connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter=org.apache.kafka.connect.storage.StringConverter
      ...
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    ...
    pgsqlCopyEditmessage1 -> schema1  
    message2 -> schema1  
      (No flush needed – same schema)
    
    message3 -> schema2  
      (Flush occurs – new schema introduced)
    
    message4 -> schema2  
      (No flush needed – same schema)
    
    message5 -> schema1  
      Without optimization: would trigger a flush  
      With optimization: no flush – schema1 is backward-compatible with schema2
    
    message6 -> schema2  
    message7 -> schema2  
      (No flush needed – same schema, it would happen based on the flush thresholds)
    ...
    connect.datalake.azure.auth.mode=Credentials
    connect.datalake.azure.account.name=$AZURE_ACCOUNT_NAME
    connect.datalake.azure.account.key=$AZURE_ACCOUNT_KEY
    ...
    ...
    connect.datalake.azure.auth.mode=ConnectionString
    connect.datalake.azure.connection.string=$AZURE_CONNECTION_STRING
    ...

    GCP Storage

    This page describes the usage of the Stream Reactor GCP Storage Sink Connector.

    circle-check

    You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

    hashtag
    Connector Class

    hashtag
    Example

    circle-check

    For more examples see the .

    hashtag
    KCQL Support

    The connector uses KCQL to map topics to GCP Storage buckets and paths. The full KCQL syntax is:

    Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as Json can use fields contaiing .:

    In this case you can use the following KCQL statement:

    hashtag
    Target Bucket and Path

    The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

    Here are a few examples:

    hashtag
    SQL Projection

    Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

    hashtag
    Source Topic

    To avoid runtime errors, make sure the topics or topics.regex setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:

    In this case the topic name will be appended to the $target destination.

    hashtag
    KCQL Properties

    The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

    Name
    Description
    Type
    Available Values
    Default Value

    The sink connector optimizes performance by padding the output object names, a practice that proves beneficial when using the GCP Storage Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the GCP Storage Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.

    hashtag
    Partitioning & Object Keys

    The object key serves as the filename used to store data in GCP Storage. There are two options for configuring the object key:

    • Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.

    • Custom: The object key is driven by the PARTITIONBY clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (GCP Athena naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.

    circle-exclamation

    The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using _topic and _partition.

    The partition clause works on header, key and values fields of the Kafka message.

    Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of GCP Storage key length restrictions.

    To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:

    However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

    You can also use the entire message key as long as it can be coerced into a primitive type:

    In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the GCP Storage object key name:

    Kafka message headers can also be used in the GCP Storage object key definition, provided the header values are of primitive types easily convertible to strings:

    Customizing the object key can leverage various components of the Kafka message. For example:

    This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure GCP Storage object keys effectively.

    To enable Athena-like partitioning, use the following syntax:

    hashtag
    Rolling Windows

    Storing data in GCP Storage and partitioning it by time is a common practice in data management. For instance, you may want to organize your GCP Storage data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

    To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation .

    Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:

    In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

    The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

    hashtag
    Data Storage Format

    While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within GCP Storage. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.

    Supported storage formats encompass:

    • AVRO

    • Parquet

    • JSON

    • CSV (including headers)

    Opting for BYTES ensures that each record is stored in its own separate object. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in GCP Storage. For cases where you prefer to consolidate multiple records into a single binary object, AVRO or Parquet are the recommended choices.

    By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

    circle-exclamation

    Not supported with a custom partition strategy.

    Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in GCP Storage is desired.

    hashtag
    Examples

    Storing the message Value Avro data as Parquet in GCP Storage:

    The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in GCP Storage.

    Enabling the full message stored as JSON in GCP Storage:

    Enabling the full message stored as AVRO in GCP Storage:

    If the restore (see the GCP Storage Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

    hashtag
    Flush Options

    The connector offers three distinct flush options for data management:

    • Flush by Count - triggers an object flush after a specified number of records have been written to it.

    • Flush by Size - initiates an object flush once a predetermined size (in bytes) has been attained.

    • Flush by Interval - enforces an object flush after a defined time interval (in seconds).

    It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that objects are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

    Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

    The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see section). The settings are optional and if not specified the defaults are:

    • flush.count = 50_000

    • flush.size = 500000000 (500MB)

    • flush.interval = 3600 (1 hour)

    circle-check

    A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.

    When connect.gcpstorage.latest.schema.optimization.enabled is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered. Consider the following sequence of messages and their associated schemas:

    hashtag
    Flushing By Interval

    The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to GCP Storage). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.

    hashtag
    Compression

    AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.

    Here are the available options for the connect.gcpstorage.compression.codec, along with indications of their support by Avro, Parquet and JSON writers:

    Compression
    Avro Support
    Avro (requires Level)
    Parquet Support
    JSON

    Please note that not all compression libraries are bundled with the GCP Storage connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

    hashtag
    Authentication

    The connector offers two distinct authentication modes:

    • Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.

    • File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.

    • Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

    The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration. This is configured, as its name suggests, by default.

    When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described .

    Here's an example configuration for the "Credentials" mode:

    And here is an example configuration using the "File" mode:

    Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

    For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers . This approach ensures robust security practices while handling access credentials.

    hashtag
    Error policies

    The connector supports .

    hashtag
    Indexes Prefix

    The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.

    By default, the prefix for these index objects is named .indexes for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this .indexes prefix.

    You can configure the root prefix for these index objects using the property connect.gcpstorage.indexes.name. This property specifies the path from the root of the GCS bucket. Note that even if you configure this property, the connector will still create a nested prefix within the specified prefix.

    hashtag
    Examples

    hashtag
    Option Reference

    Name
    Description
    Type
    Available Values
    Default Value

    padding.char

    Defines the character used for padding.

    Char

    ‘0’

    padding.length.partition

    Sets the padding length for the partition.

    Int

    0

    padding.length.offset

    Sets the padding length for the offset.

    Int

    12

    partition.include.keys

    Specifies whether partition keys are included.

    Boolean

    false Default (Custom Partitioning): true

    store.envelope

    Indicates whether to store the entire Kafka message

    Boolean

    store.envelope.fields.key

    Indicates whether to store the envelope’s key.

    Boolean

    store.envelope.fields.headers

    Indicates whether to store the envelope’s headers.

    Boolean

    store.envelope.fields.value

    Indicates whether to store the envelope’s value.

    Boolean

    store.envelope.fields..metadata

    Indicates whether to store the envelope’s metadata.

    Boolean

    flush.size

    Specifies the size (in bytes) for the flush operation.

    Long

    500000000 (500MB)

    flush.count

    Specifies the number of records for the flush operation.

    Int

    50000

    flush.interval

    Specifies the interval (in seconds) for the flush operation.

    Long

    3600 (1 hour)

    key.suffix

    When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added

    String

    <empty>

    SNAPPY

    ✅

    ✅

    GZIP

    ✅

    ✅

    LZ0

    ✅

    LZ4

    ✅

    BROTLI

    ✅

    BZIP2

    ✅

    ZSTD

    ✅

    ⚙️

    ✅

    DEFLATE

    ✅

    ⚙️

    XZ

    ✅

    ⚙️

    indexes/datalake-connector-logs

    indexes/datalake-connector-logs/<connector_name>/

    Uses a custom subdirectory datalake-connector-logs within indexes, with a subdirectory for each connector.

    logs/indexes

    logs/indexes/<connector_name>/

    Indexes are stored under logs/indexes, with a subdirectory for each connector.

    connect.datalake.azure.account.key

    The Azure Account Key used for authentication.

    string

    (Empty)

    connect.datalake.azure.account.name

    The Azure Account Name used for authentication.

    string

    (Empty)

    connect.datalake.pool.max.connections

    Specifies the maximum number of connections allowed in the Azure Client’s HTTP connection pool when interacting with Datalake.

    int

    -1 (undefined)

    50

    connect.datalake.endpoint

    Datalake endpoint URL.

    string

    (Empty)

    connect.datalake.error.policy

    Defines the error handling policy when errors occur during data transfer to or from Datalake.

    string

    “NOOP,” “THROW,” “RETRY”

    “THROW”

    connect.datalake.max.retries

    Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.

    int

    20

    connect.datalake.retry.interval

    Specifies the interval (in milliseconds) between retry attempts by the connector.

    int

    60000

    connect.datalake.http.max.retries

    Sets the maximum number of retries for the underlying HTTP client when interacting with Datalake.

    long

    5

    connect.datalake.http.retry.interval

    Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.

    long

    50

    connect.datalake.local.tmp.directory

    Enables the use of a local folder as a staging area for data transfer operations.

    string

    (Empty)

    connect.datalake.kcql

    A SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.

    string

    (Empty)

    connect.datalake.compression.codec

    Sets the Parquet compression codec to be used when writing data to Datalake.

    string

    “UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”

    “UNCOMPRESSED”

    connect.datalake.compression.level

    Sets the compression level when compression is enabled for data transfer to Datalake.

    int

    1-9

    (Empty)

    connect.datalake.seek.max.files

    Specifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.

    int

    5

    connect.datalake.indexes.name

    Configure the indexes root directory for this connector.

    string

    ".indexes"

    connect.datalake.exactly.once.enable

    By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management

    boolean

    true, false

    true

    connect.datalake.schema.change.detector

    Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).

    string

    default, version, compatibility

    default

    connect.datalake.skip.null.values

    Skip records with null values (a.k.a. tombstone records).

    boolean

    true, false

    false

    connect.datalake.latest.schema.optimization.enabled

    When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.

    boolean

    true,false

    false

    Defines the character used for padding.

    Char

    ‘0’

    padding.length.partition

    Sets the padding length for the partition.

    Int

    0

    padding.length.offset

    Sets the padding length for the offset.

    Int

    12

    partition.include.keys

    Specifies whether partition keys are included.

    Boolean

    false Default (Custom Partitioning): true

    store.envelope

    Indicates whether to store the entire Kafka message

    Boolean

    store.envelope.fields.key

    Indicates whether to store the envelope’s key.

    Boolean

    store.envelope.fields.headers

    Indicates whether to store the envelope’s headers.

    Boolean

    store.envelope.fiels.value

    Indicates whether to store the envelope’s value.

    Boolean

    store.envelope.fields.metadata

    Indicates whether to store the envelope’s metadata.

    Boolean

    flush.size

    Specifies the size (in bytes) for the flush operation.

    Long

    500000000 (500MB)

    flush.count

    Specifies the number of records for the flush operation.

    Int

    50000

    flush.interval

    Specifies the interval (in seconds) for the flush operation.

    Long

    3600 (1 hour)

    key.suffix

    When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added

    String

    <empty>

    Text

  • BYTES

  • ✅

    ✅

    GZIP

    ✅

    ✅

    LZ0

    ✅

    LZ4

    ✅

    BROTLI

    ✅

    BZIP2

    ✅

    ZSTD

    ✅

    ⚙️

    ✅

    DEFLATE

    ✅

    ⚙️

    XZ

    ✅

    ⚙️

    indexes/gcs-connector-logs/<connector_name>/

    Uses a custom nested prefix gcs-connector-logs within indexes, with a nested prefix for each connector.

    logs/indexes

    logs/indexes/<connector_name>/

    Indexes are stored under logs/indexes, with a nested prefix for each connector.

    For "auth.mode" credentials: GCP Authentication credentials string.

    string

    (Empty)

    connect.gcpstorage.gcp.file

    For "auth.mode" file: Local file path for file containing GCP authentication credentials.

    string

    (Empty)

    connect.gcpstorage.gcp.project.id

    GCP Project ID.

    string

    (Empty)

    connect.gcpstorage.gcp.quota.project.id

    GCP Quota Project ID.

    string

    (Empty)

    connect.gcpstorage.endpoint

    Endpoint for GCP Storage.

    string

    (Empty)

    connect.gcpstorage.error.policy

    Defines the error handling policy when errors occur during data transfer to or from GCP Storage.

    string

    "NOOP", "THROW", "RETRY"

    "THROW"

    connect.gcpstorage.max.retries

    Sets the maximum number of retries the connector will attempt before reporting an error.

    int

    20

    connect.gcpstorage.retry.interval

    Specifies the interval (in milliseconds) between retry attempts by the connector.

    int

    60000

    connect.gcpstorage.http.max.retries

    Sets the maximum number of retries for the underlying HTTP client when interacting with GCP.

    long

    5

    connect.gcpstorage.http.retry.interval

    Specifies the retry interval (in milliseconds) for the underlying HTTP client.

    long

    50

    connect.gcpstorage.http.retry.timeout.multiplier

    Specifies the change in delay before the next retry or poll

    double

    3.0

    connect.gcpstorage.local.tmp.directory

    Enables the use of a local folder as a staging area for data transfer operations.

    string

    (Empty)

    connect.gcpstorage.kcql

    A SQL-like configuration that defines the behavior of the connector.

    string

    (Empty)

    connect.gcpstorage.compression.codec

    Sets the Parquet compression codec to be used when writing data to GCP Storage.

    string

    "UNCOMPRESSED", "SNAPPY", "GZIP", "LZ0", "LZ4", "BROTLI", "BZIP2", "ZSTD", "DEFLATE", "XZ"

    "UNCOMPRESSED"

    connect.gcpstorage.compression.level

    Sets the compression level when compression is enabled for data transfer to GCP Storage.

    int

    1-9

    (Empty)

    connect.gcpstorage.seek.max.files

    Specifies the maximum threshold for the number of files the connector uses.

    int

    5

    connect.gcpstorage.indexes.name

    Configure the indexes prefix for this connector.

    string

    ".indexes"

    connect.gcpstorage.exactly.once.enable

    By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management

    boolean

    true, false

    true

    connect.gcpstorage.schema.change.detector

    Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with default (object equality), version (version field comparison), or compatibility (Avro compatibility checking).

    string

    default, version, compatibility

    default

    connect.gcpstorage.skip.null.values

    Skip records with null values (a.k.a. tombstone records).

    boolean

    true, false

    false

    connect.gcpstorage.latest.schema.optimization.enabled

    When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.

    boolean

    true, false

    false

    padding.type

    Specifies the type of padding to be applied.

    LeftPad, RightPad, NoOp

    LeftPad, RightPad, NoOp

    LeftPad

    UNCOMPRESSED

    ✅

    ✅

    ✅

    Index Name (connect.gcpstorage.indexes.name)

    Resulting Indexes Prefix Structure

    Description

    .indexes (default)

    .indexes/<connector_name>/

    The default setup, where each connector uses its own nested prefix within .indexes.

    custom-indexes

    custom-indexes/<connector_name>/

    Custom prefix custom-indexes, with a nested prefix for each connector.

    connect.gcpstorage.gcp.auth.mode

    Specifies the authentication mode for connecting to GCP.

    string

    "Credentials", "File" or "Default"

    "Default"

    tutorials
    here
    herearrow-up-right
    herearrow-up-right
    Error policies
    sink commit.png
    KCQL Properties

    padding.char

    SNAPPY

    indexes/gcs-connector-logs

    connect.gcpstorage.gcp.credentials

    io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
    connector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
    connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    INSERT INTO bucketAddress[:pathPrefix]
    SELECT *
    FROM kafka-topic
    [[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
    [STOREAS storage_format]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]
    {
      ...
      "a.b": "value",
      ...
    }
    INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
    INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
    INSERT INTO testcontainer SELECT * FROM topicA;
    INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
    topics.regex = ^sensor_data_\d+$
    connect.gcpstorage.kcql= INSERT INTO $target SELECT * FROM  `*` ....
    PARTITIONBY fieldA, fieldB
    PARTITIONBY _key
    PARTITIONBY _key.fieldA, _key.fieldB
    PARTITIONBY _header.<header_key1>[, _header.<header_key2>]
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    INSERT INTO $container[:$prefix]
    SELECT * FROM $topic
    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
    STOREAS `AVRO`
    PROPERTIES (
        'partition.include.keys'=true,
    )
    connector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
    connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    {
      "key": <the message Key, which can be a primitive or a complex object>,
      "value": <the message Key, which can be a primitive or a complex object>,
      "headers": {
        "header1": "value1",
        "header2": "value2"
      },
      "metadata": {
        "offset": 0,
        "partition": 0,
        "timestamp": 0,
        "topic": "topic"
      }
    }
    ...
    connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.storage.StringConverter
    ...
    ...
    connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    ...
    pgsqlCopyEditmessage1 -> schema1  
    message2 -> schema1  
      (No flush needed – same schema)
    
    message3 -> schema2  
      (Flush occurs – new schema introduced)
    
    message4 -> schema2  
      (No flush needed – same schema)
    
    message5 -> schema1  
      Without optimization: would trigger a flush  
      With optimization: no flush – schema1 is backward-compatible with schema2
    
    message6 -> schema2  
    message7 -> schema2  
      (No flush needed – same schema, it would happen based on the flush thresholds)
    ...
    connect.gcpstorage.gcp.auth.mode=Credentials
    connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
    connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
    ...
    ...
    connect.gcpstorage.gcp.auth.mode=File
    connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-write-credential.txt
    ...