Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.
io.lenses.streamreactor.connect.cassandra.CassandraSinkConnectorname=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
connect.cassandra.port=9042
connect.cassandra.contact.points=cassandraconnect.cassandra.contact.points=[host list]
connnect.cassandra.port=9042
connect.cassandra.load.balancing.local.dc=datacentre-name
# optional entries
# connect.cassandra.max.concurrent.requests = 100
# connect.cassandra.max.batch.size = 64
# connect.cassandra.connection.pool.size = 4
# connect.cassandra.query.timeout.ms = 30
# connect.cassandra.compression = Noneconnect.cassandra.ssl.provider=
connect.cassandra.ssl.cipher.suites=
connect.cassandra.ssl.hostname.verification=true
connect.cassandra.ssl.keystore.path=
connect.cassandra.ssl.keystore.password=
connect.cassandra.ssl.truststore.password=
connect.cassandra.ssl.truststore.path=
# Path to the SSL certificate file, when using OpenSSL.
connect.cassandra.ssl.openssl.key.cert.chain=
# Path to the private key file, when using OpenSSL.
connect.cassandra.ssl.openssl.private.key=connect.cassandra.auth.provider=
connect.cassandra.auth.username=
connect.cassandra.auth.password=
# for SASL authentication which requires connect.cassandra.auth.provider
# set to GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=connect.cassandra.auth.provider=GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=INSERT INTO <keyspace><your-cassandra-table>
SELECT <field projection,...
FROM <your-table>
PROPERTIES(< a set of keys to control the connector behaviour>)INSERT INTO mykeyspace.types
SELECT
_key.bigint as bigintcol
, _value.boolean as booleancol
, _key.double as doublecol
, _value.float as floatcol
, _header.int as intcol
FROM myTopic
INSERT INTO mykespace.types
SELECT
_value.bigint as bigintcol
, _value.double as doublecol
, _value.ttlcol as message_internal_ttl
, _value.timestampcol as message_internal_timestamp
FROM myTopic
PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
INSERT INTO mykeyspace.CASE_SENSITIVE
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM mytopic
INSERT INTO mykeyspace.tableA
SELECT
key.my_pk as my_pk
, _value.my_value as my_value
FROM topicA
PROPERTIES(
'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
'deletesEnabled' ='true'
)INSERT INTO ${target}
SELECT _value/_key/_header.{field_path} AS ${table_column}
FROM mytopicINSERT INTO ${target}
SELECT
_key as row_key
, _value as contentINSERT INTO ${target}
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM myTopicINSERT INTO stocks_keyspace.stocks_by_symbol
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;
INSERT INTO stocks_keyspace.stocks_by_exchange
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;{"symbol":"APPL",
"value":214.4,
"exchange":"NASDAQ",
"ts":"2025-07-23T14:56:10.009"}CREATE TYPE stocks_keyspace.stocks_type (
symbol text,
ts timestamp,
exchange text,
value double
);
CREATE TABLE stocks_keyspace.stocks_table (
name text PRIMARY KEY,
stocks FROZEN<stocks_type>
);INSERT INTO stocks_keyspace.stocks_table
SELECT _key AS name, _value AS stocks
FROM stocksINSERT INTO ${target}
SELECT
`now()` as loaded_at
FROM myTopicSELECT [_value/_key/_header].{path} AS message_internal_timestamp
FROM myTopic
--You may optionally specify the time unit by using:
PROPERTIES('timestampTimeUnit'='MICROSECONDS')SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
SELECT _key.timestampcol AS message_internal_timestamp FROM myTopicSELECT [_value/_key/_header].{path} as message_internal_ttl
FROM myTopic
-- Optional: Specify the TTL unit
PROPERTIES('ttlTimeUnit'='SECONDS')SELECT _header.ttl as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='MINUTES')
SELECT _key.expiry as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='HOURS')
SELECT _value.ttl_duration as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='DAYS')INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
'codec.locale' = 'en_US',
'codec.timeZone' = 'UTC',
'codec.timestamp' = 'CQL_TIMESTAMP',
'codec.date' = 'ISO_LOCAL_DATE',
'codec.time' = 'ISO_LOCAL_TIME',
'codec.unit' = 'MILLISECONDS'
)INSERT INTO ${target}
SELECT
_value.bigint as some_name,
_value.int as some_name2
FROM myTopic
PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')connect.cassandra.driver.{driver_setting}INSERT INTO lenses-io-demo ...INSERT INTO `<path to field>`
SELECT * FROM control.boxes.test
PROPERTIES('mqtt.target.from.field'='true')io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnectorname=mqtt
connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=orders
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_sink_id
connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM ordersINSERT
INTO <mqtt-topic>
SELECT * //no field projection supported
FROM <kafka-topic>
//no WHERE clause supported-- Insert into /landoop/demo all fields from kafka_topicA
INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA
-- Insert into /landoop/demo all fields from dynamic field
INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')INSERT INTO `_key`
SELECT ...INSERT INTO `_topic`
SELECT ...connect.cassandra.driver.*connect.cassandra.driver.basic.request.consistency=ALLThis page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.
This page details the configuration options for the Stream Reactor Kafka Connect sink connectors.














INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicAThis page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.
io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnectorconnector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
name=AzureEventHubsSinkConnector
tasks.max=1
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');INSERT INTO <your-service-bus>
SELECT *
FROM <your-kafka-topic>
PROPERTIES(...); connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');io.lenses.streamreactor.connect.influx.InfluxSinkConnectorname=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)io.lenses.streamreactor.connect.jms.sink.JMSSinkConnectorname=jms
connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSONINSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE
-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPICThis page describes the usage of the Stream Reactor MongoDB Sink Connector.
This page describes the usage of the Stream Reactor Cassandra Sink Connector part of the kafka-connect-cassandra-*** artifact.
name=mongo
connector.class=io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector
tasks.max=1
topics=orders
connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders
connect.mongo.db=connect
connect.mongo.connection=mongodb://mongo:27017INSERT | UPSERT
INTO <collection_name>
SELECT FIELD, ...
FROM <kafka-topic>
BATCH = 100-- Select all fields from topic fx_prices and insert into the fx collection
INSERT INTO fx SELECT * FROM fx_prices
-- Select all fields from topic fx_prices and upsert into the fx collection,
-- The assumption is there will be a ticker field in the incoming json:
UPSERT INTO fx SELECT * FROM fx_prices PK ticker# default of scram
mongodb://host1/?authSource=db1
# scram explict
mongodb://host1/?authSource=db1&authMechanism=SCRAM-SHA-1
# mongo-cr
mongodb://host1/?authSource=db1&authMechanism=MONGODB-CR
# x.509
mongodb://host1/?authSource=db1&authMechanism=MONGODB-X509
# kerberos
mongodb://host1/?authSource=db1&authMechanism=GSSAPI
# ldap
mongodb://host1/?authSource=db1&authMechanism=PLAINio.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnectorname=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=cassandraINSERT INTO <your-cassandra-table>
SELECT FIELD,...
FROM <your-table>
[TTL=Time to live]-- Insert mode, select all fields from topicA and
-- write to tableA
INSERT INTO tableA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB
INSERT INTO tableB SELECT x AS a, y, c FROM topicB
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB with TTL
INSERT INTO tableB SELECT x, y FROM topicB TTL=100000DELETE FROM orders WHERE id = ? and product = ?# Message
# "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
# DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"
# connect.cassandra.delete.enabled=true
# connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
# connect.cassandra.delete.struct_flds=id,productThis page describes the usage of the Stream Reactor Redis Sink Connector.
io.lenses.streamreactor.connect.redis.sink.RedisSinkConnectorname=redis
connector.class=io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
tasks.max=1
topics=redis
connect.redis.host=redis
connect.redis.port=6379
connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM[INSERT INTO <redis-cache>]
SELECT FIELD, ...
FROM <kafka-topic>
[PK FIELD]
[STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]{ "symbol": "USDGBP" , "price": 0.7943 }
{ "symbol": "EURGBP" , "price": 0.8597 }SELECT price from yahoo-fx PK symbolKey=EURGBP Value={ "price": 0.7943 }INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADDINSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAMSELECT * FROM topic STOREAS PubSub (channel=myfield)This page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.
This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.
io.lenses.streamreactor.connect.elastic6.ElasticSinkConnectorio.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnectorname=cosmosdb
connector.class=io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
tasks.max=1
topics=orders-string
connect.cosmosdb.kcql=INSERT INTO orders SELECT * FROM orders-string
connect.cosmosdb.db=dm
connect.cosmosdb.endpoint=[YOUR_AZURE_ENDPOINT]
connect.cosmosdb.db.create=true
connect.cosmosdb.master.key=[YOUR_MASTER_KEY]
connect.cosmosdb.batch.size=10INSERT | UPSERT
INTO <your-collection>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELDS,...]-- Insert mode, select all fields from topicA
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA
-- UPSERT mode, select 3 fields and
-- rename from topicB and write to tableB
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK idconnect.cosmosdb.bulk.enabled=trueconnect.cosmosdb.collection.throughput=400connect.cosmosdb.proxy=<proxy-uri>connect.progress.enabled=trueINSERT INTO myCollection SELECT * FROM myTopic PROPERTIES('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)io.lenses.streamreactor.connect.elastic7.ElasticSinkConnectorname=elastic
connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=trueINSERT | UPSERT
INTO <elastic_index >
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHDOCTYPE=<your_document_type>]
[WITHINDEXSUFFIX=<your_suffix>]-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK idINSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}INSERT INTO index_name SELECT * FROM topicAINSERT INTO _header.gate SELECT * FROM topicAINSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicAINSERT INTO _key SELECT * FROM topicAINSERT INTO _value.name SELECT * FROM topicAINSERT INTO _value.name.firstName SELECT * FROM topicAINSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA{
"customer.name": {
"first.name": "hans"
}
}ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS # Can also be PKCS12 if applicable
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS # Can also be PKCS12 if applicable
ssl.protocol=TLSv1.2 # Or TLSv1.3 for stronger security
ssl.trustmanager.algorithm=PKIX # Default algorithm for managing certificates
ssl.keymanager.algorithm=PKIX # Default algorithm for managing certificatesThis page describes the usage of the Stream Reactor HTTP Sink Connector.
io.lenses.streamreactor.connect.http.sink.HttpSinkConnectorname=lenseshttp
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
tasks.max=1
topics=topicToRead
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.authentication.type=none
connect.http.method=POST
connect.http.endpoint=http://endpoint.local/receive
connect.http.request.content="My Static Content Template"
connect.http.batch.count=1connect.http.request.content="My Static Content Template"connect.http.request.content="<product><id>{{value.name}}</id></product>" <messages>
{{#message}}
<message>
<topic>{{topic}}</topic>
<employee>{{value.employeeId}}</employee>
<order>{{value.orderNo}}</order>
<groupDomain>{{value.groupDomain}}</groupDomain>
</message>
{{/message}}
</messages>connect.http.request.content="<messages>{{#message}}<message><topic>{{topic}}</topic><employee>{{value.employeeId}}</employee><order>{{value.orderNo}}</order><groupDomain>{{value.groupDomain}}</groupDomain></message>{{/message}}</messages>" <messages>
<message>
<topic>myTopic</topic>
<employee>Abcd1234</employee>
<order>10</order>
<groupDomain>myExampleGroup.uk</groupDomain>
</message>
<message>
<topic>myTopic</topic>
<employee>Efgh5678</employee>
<order>11</order>
<groupDomain>myExampleGroup.uk</groupDomain>
</message>
</messages>connect.http.authentication.type=noneconnect.http.authentication.type=basic
connect.http.authentication.basic.username=user
connect.http.authentication.basic.password=passwordconnect.http.authentication.type=oauth2
connect.http.authentication.oauth2.token.url=http://myoauth2.local/getToken
connect.http.authentication.oauth2.client.id=clientId
connect.http.authentication.oauth2.client.secret=client-secret
connect.http.authentication.oauth2.token.property=access_token
connect.http.authentication.oauth2.client.scope=any
connect.http.authentication.oauth2.client.headers=header:valueconnect.http.request.headers="Content-Type","text/plain","X-User","{{header.kafkauser}}","Product","{{value.product.id}}"connect.http.batch.count=50000
connect.http.batch.size=500000000
connect.http.time.interval=3600connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="My Static Content Template"
connect.http.batch.count=1connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="{{value}}"
connect.http.batch.count=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverterconnector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="product: {{value.product}}"
connect.http.batch.size=1
value.converter.schemas.enable=falseconnector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="whole product message: {{value}}"
connect.http.time.interval=5connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="product: {{value.product}}"
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://schema-registry:8081connect.reporting.error.config.enabled=true
connect.reporting.success.config.enabled=trueconnect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=localhost:9094
connect.reporting.error.config.topic=http-monitoringconnect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=my-kafka-cluster.com:9093
connect.reporting.error.config.security.protocol=SASL_SSL
connect.reporting.error.config.sasl.mechanism=PLAIN
connect.reporting.error.config.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="MYUSER" password="MYPASSWORD";connect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=SSL://my-ssl-protected-cluster:9094
connect.reporting.error.config.security.protocol=SSL
connect.reporting.error.config.ssl.keystore.location=/path/to/my/keystore.p12
connect.reporting.error.config.ssl.keystore.type=PKCS12
connect.reporting.error.config.ssl.truststore.location=/path/to/my/truststore.p12
connect.reporting.error.config.ssl.truststore.password=************
connect.reporting.error.config.ssl.truststore.type=PKCS12
connect.reporting.error.config.topic=http-error-topicThis page describes the usage of the Stream Reactor GCP Big Query Sink Connector.
Invalid field name
"com.examples.project-super-important.v1.MyData". Fields must
contain only letters, numbers, and underscores, start with a letter or
underscore, and be at most 300 characters long.name = kcbq-connect1
connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max = 1
topics = quickstart
sanitizeTopics = true
autoCreateTables = true
allowNewBigQueryFields = true
allowBigQueryRequiredFieldRelaxation = true
schemaRetriever = com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
project = lenses-123
defaultDataset = ConfluentDataSet
keyfile = <path to json file>
transforms = RegexTransformation
transforms.RegexTransformation.type = org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexTransformation.regex = (kcbq_)(.*)
transforms.RegexTransformation.replacement = $2name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
autoCreateTables=truename=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
enableBatchLoad=orders,customers
gcsBucketName=my-gcs-bucket
autoCreateBucket=truename=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
upsertEnabled=true
mergeIntervalMs=30000
mergeRecordsThreshold=1000log4j.logger.com.wepay.kafka.connect.bigquery=DEBUGThis page describes the usage of the Stream Reactor AWS S3 Sink Connector.
io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.ts STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demoINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testbucket SELECT * FROM topicA;
INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.s3.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $bucket[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=$AWS_ACCESS_KEY
connect.s3.aws.secret.key=$AWS_SECRET_KEY
...listObjectsV2
listObjectsV2Pagbinator
putObject
getObject
headObject
deleteObjects
deleteObjectio.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnectorconnector.class=io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HHINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.datalake.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $container[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.azure.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
... ...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.datalake.azure.auth.mode=Credentials
connect.datalake.azure.account.name=$AZURE_ACCOUNT_NAME
connect.datalake.azure.account.key=$AZURE_ACCOUNT_KEY
......
connect.datalake.azure.auth.mode=ConnectionString
connect.datalake.azure.connection.string=$AZURE_CONNECTION_STRING
...This page describes the usage of the Stream Reactor GCP Storage Sink Connector.


io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnectorconnector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HHINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.gcpstorage.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $container[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
......
connect.gcpstorage.gcp.auth.mode=File
connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-write-credential.txt
...