Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.
This page details the configuration options for the Stream Reactor Kafka Connect sink connectors.













INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicAThis page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.
This page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.
io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnectorio.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnectorname=cosmosdb
connector.class=io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
tasks.max=1
topics=orders-string
connect.cosmosdb.kcql=INSERT INTO orders SELECT * FROM orders-string
connect.cosmosdb.db=dm
connect.cosmosdb.endpoint=[YOUR_AZURE_ENDPOINT]
connect.cosmosdb.db.create=true
connect.cosmosdb.master.key=[YOUR_MASTER_KEY]
connect.cosmosdb.batch.size=10INSERT | UPSERT
INTO <your-collection>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELDS,...]-- Insert mode, select all fields from topicA
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA
-- UPSERT mode, select 3 fields and
-- rename from topicB and write to tableB
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK idconnect.cosmosdb.bulk.enabled=trueconnect.cosmosdb.collection.throughput=400connect.cosmosdb.proxy=<proxy-uri>connect.progress.enabled=trueINSERT INTO myCollection SELECT * FROM myTopic PROPERTIES('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)connector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
name=AzureEventHubsSinkConnector
tasks.max=1
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');INSERT INTO <your-service-bus>
SELECT *
FROM <your-kafka-topic>
PROPERTIES(...); connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');This page describes the usage of the Stream Reactor Cassandra Sink Connector part of the kafka-connect-cassandra-*** artifact.
io.lenses.streamreactor.connect.jms.sink.JMSSinkConnectorname=jms
connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSONINSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE
-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPICio.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnectorname=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=cassandraINSERT INTO <your-cassandra-table>
SELECT FIELD,...
FROM <your-table>
[TTL=Time to live]-- Insert mode, select all fields from topicA and
-- write to tableA
INSERT INTO tableA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB
INSERT INTO tableB SELECT x AS a, y, c FROM topicB
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB with TTL
INSERT INTO tableB SELECT x, y FROM topicB TTL=100000DELETE FROM orders WHERE id = ? and product = ?# Message
# "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
# DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"
# connect.cassandra.delete.enabled=true
# connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
# connect.cassandra.delete.struct_flds=id,productThis page describes the usage of the Stream Reactor AWS S3 Sink Connector.
io.lenses.streamreactor.connect.redis.sink.RedisSinkConnectorname=redis
connector.class=io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
tasks.max=1
topics=redis
connect.redis.host=redis
connect.redis.port=6379
connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM[INSERT INTO <redis-cache>]
SELECT FIELD, ...
FROM <kafka-topic>
[PK FIELD]
[STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]{ "symbol": "USDGBP" , "price": 0.7943 }
{ "symbol": "EURGBP" , "price": 0.8597 }SELECT price from yahoo-fx PK symbolKey=EURGBP Value={ "price": 0.7943 }INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADDINSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAMSELECT * FROM topic STOREAS PubSub (channel=myfield)io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.ts STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demoINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testbucket SELECT * FROM topicA;
INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.s3.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $bucket[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=$AWS_ACCESS_KEY
connect.s3.aws.secret.key=$AWS_SECRET_KEY
...listObjectsV2
listObjectsV2Pagbinator
putObject
getObject
headObject
deleteObjects
deleteObjectio.lenses.streamreactor.connect.influx.InfluxSinkConnectorname=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)io.lenses.streamreactor.connect.elastic6.ElasticSinkConnectorio.lenses.streamreactor.connect.elastic7.ElasticSinkConnectorname=elastic
connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=trueINSERT | UPSERT
INTO <elastic_index >
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHDOCTYPE=<your_document_type>]
[WITHINDEXSUFFIX=<your_suffix>]-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK idINSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}INSERT INTO index_name SELECT * FROM topicAINSERT INTO _header.gate SELECT * FROM topicAINSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicAINSERT INTO _key SELECT * FROM topicAINSERT INTO _value.name SELECT * FROM topicAINSERT INTO _value.name.firstName SELECT * FROM topicAINSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA{
"customer.name": {
"first.name": "hans"
}
}ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS # Can also be PKCS12 if applicable
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS # Can also be PKCS12 if applicable
ssl.protocol=TLSv1.2 # Or TLSv1.3 for stronger security
ssl.trustmanager.algorithm=PKIX # Default algorithm for managing certificates
ssl.keymanager.algorithm=PKIX # Default algorithm for managing certificatesThis page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.
INSERT INTO lenses-io-demo ...INSERT INTO `<path to field>`
SELECT * FROM control.boxes.test
PROPERTIES('mqtt.target.from.field'='true')io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnectorname=mqtt
connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=orders
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_sink_id
connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM ordersINSERT
INTO <mqtt-topic>
SELECT * //no field projection supported
FROM <kafka-topic>
//no WHERE clause supported-- Insert into /landoop/demo all fields from kafka_topicA
INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA
-- Insert into /landoop/demo all fields from dynamic field
INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
INSERT INTO `_key`
SELECT ...INSERT INTO `_topic`
SELECT ...connector.class=io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HHINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.datalake.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $container[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.azure.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
... ...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.datalake.azure.auth.mode=Credentials
connect.datalake.azure.account.name=$AZURE_ACCOUNT_NAME
connect.datalake.azure.account.key=$AZURE_ACCOUNT_KEY
......
connect.datalake.azure.auth.mode=ConnectionString
connect.datalake.azure.connection.string=$AZURE_CONNECTION_STRING
...io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnectorname=mongo
connector.class=io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector
tasks.max=1
topics=orders
connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders
connect.mongo.db=connect
connect.mongo.connection=mongodb://mongo:27017INSERT | UPSERT
INTO <collection_name>
SELECT FIELD, ...
FROM <kafka-topic>
BATCH = 100-- Select all fields from topic fx_prices and insert into the fx collection
INSERT INTO fx SELECT * FROM fx_prices
-- Select all fields from topic fx_prices and upsert into the fx collection,
-- The assumption is there will be a ticker field in the incoming json:
UPSERT INTO fx SELECT * FROM fx_prices PK ticker# default of scram
mongodb://host1/?authSource=db1
# scram explict
mongodb://host1/?authSource=db1&authMechanism=SCRAM-SHA-1
# mongo-cr
mongodb://host1/?authSource=db1&authMechanism=MONGODB-CR
# x.509
mongodb://host1/?authSource=db1&authMechanism=MONGODB-X509
# kerberos
mongodb://host1/?authSource=db1&authMechanism=GSSAPI
# ldap
mongodb://host1/?authSource=db1&authMechanism=PLAINThis page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.
io.lenses.streamreactor.connect.cassandra.CassandraSinkConnectorname=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
connect.cassandra.port=9042
connect.cassandra.contact.points=cassandraconnect.cassandra.contact.points=[host list]
connnect.cassandra.port=9042
connect.cassandra.load.balancing.local.dc=datacentre-name
# optional entries
# connect.cassandra.max.concurrent.requests = 100
# connect.cassandra.max.batch.size = 64
# connect.cassandra.connection.pool.size = 4
# connect.cassandra.query.timeout.ms = 30
# connect.cassandra.compression = Noneconnect.cassandra.ssl.provider=
connect.cassandra.ssl.cipher.suites=
connect.cassandra.ssl.hostname.verification=true
connect.cassandra.ssl.keystore.path=
connect.cassandra.ssl.keystore.password=
connect.cassandra.ssl.truststore.password=
connect.cassandra.ssl.truststore.path=
# Path to the SSL certificate file, when using OpenSSL.
connect.cassandra.ssl.openssl.key.cert.chain=
# Path to the private key file, when using OpenSSL.
connect.cassandra.ssl.openssl.private.key=connect.cassandra.auth.provider=
connect.cassandra.auth.username=
connect.cassandra.auth.password=
# for SASL authentication which requires connect.cassandra.auth.provider
# set to GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=connect.cassandra.auth.provider=GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=INSERT INTO <keyspace><your-cassandra-table>
SELECT <field projection,...
FROM <your-table>
PROPERTIES(< a set of keys to control the connector behaviour>)INSERT INTO mykeyspace.types
SELECT
_key.bigint as bigintcol
, _value.boolean as booleancol
, _key.double as doublecol
, _value.float as floatcol
, _header.int as intcol
FROM myTopic
INSERT INTO mykespace.types
SELECT
_value.bigint as bigintcol
, _value.double as doublecol
, _value.ttlcol as message_internal_ttl
, _value.timestampcol as message_internal_timestamp
FROM myTopic
PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
INSERT INTO mykeyspace.CASE_SENSITIVE
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM mytopic
INSERT INTO mykeyspace.tableA
SELECT
key.my_pk as my_pk
, _value.my_value as my_value
FROM topicA
PROPERTIES(
'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
'deletesEnabled' ='true'
)INSERT INTO ${target}
SELECT _value/_key/_header.{field_path} AS ${table_column}
FROM mytopicINSERT INTO ${target}
SELECT
_key as row_key
, _value as contentINSERT INTO ${target}
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM myTopicINSERT INTO stocks_keyspace.stocks_by_symbol
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;
INSERT INTO stocks_keyspace.stocks_by_exchange
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;{"symbol":"APPL",
"value":214.4,
"exchange":"NASDAQ",
"ts":"2025-07-23T14:56:10.009"}CREATE TYPE stocks_keyspace.stocks_type (
symbol text,
ts timestamp,
exchange text,
value double
);
CREATE TABLE stocks_keyspace.stocks_table (
name text PRIMARY KEY,
stocks FROZEN<stocks_type>
);INSERT INTO stocks_keyspace.stocks_table
SELECT _key AS name, _value AS stocks
FROM stocksINSERT INTO ${target}
SELECT
`now()` as loaded_at
FROM myTopicSELECT [_value/_key/_header].{path} AS message_internal_timestamp
FROM myTopic
--You may optionally specify the time unit by using:
PROPERTIES('timestampTimeUnit'='MICROSECONDS')SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
SELECT _key.timestampcol AS message_internal_timestamp FROM myTopicSELECT [_value/_key/_header].{path} as message_internal_ttl
FROM myTopic
-- Optional: Specify the TTL unit
PROPERTIES('ttlTimeUnit'='SECONDS')SELECT _header.ttl as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='MINUTES')
SELECT _key.expiry as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='HOURS')
SELECT _value.ttl_duration as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='DAYS')INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
'codec.locale' = 'en_US',
'codec.timeZone' = 'UTC',
'codec.timestamp' = 'CQL_TIMESTAMP',
'codec.date' = 'ISO_LOCAL_DATE',
'codec.time' = 'ISO_LOCAL_TIME',
'codec.unit' = 'MILLISECONDS'
)INSERT INTO ${target}
SELECT
_value.bigint as some_name,
_value.int as some_name2
FROM myTopic
PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')connect.cassandra.driver.{driver_setting}connect.cassandra.driver.*connect.cassandra.driver.basic.request.consistency=ALL