Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Lenses Kafka Connectors are a collection of open-source, Apache 2.0 licensed, Kafka Connect Connectors. Maintained by Lenses.io to deliver open-source Kafka Connectors to the community.
This page describes the usage of the Stream Reactor Azure Event Hubs Sink Connector.
This page describes how to used SMTs your Kafka Connect Clusters.
This page contains tutorials for common Kafka Connect use cases.
Coming soon!
Coming soon!
This page describes the release notes for the Stream Reactor.
This page describes installing the Lenses Kafka Connectors.
/opt/stream-reactor-x.x.x-x.x.x
├── bin
├── conf
├── libs
├── LICENSEThis page details the configuration options for the Stream Reactor Kafka Connect source connectors.
This page describes an overview of the Lenses SMTs for Kafka Connect.
GitHub Release downloads for Stream Reactor Connectors, Secret Providers and SMTs.




key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"My Static Content Template","batch":{"batchCount":1}}connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"{{value}}","batch":{"batchCount":1}}
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverterconnector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"product: {{value.product}},"batch":{"batchSize":1}}
value.converter.schemas.enable=falseconnector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"whole product message: {{value}}","batch":{"timeInterval":5}}connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.config={"method":"Post","endpoint":"https://my-endpoint.example.com","content":"product: {{value.product}}"}
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://schema-registry:8081key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverterkey.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=falseplugin.path=/usr/share/connectors,/opt/smtplugin.path=/usr/share/connectors,/opt/stream-reactor-x.x.x-x.x.x/libsInserts the system clock as a message header.
A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a message header, with a value of type STRING.
This page describes how to use Error policies in Stream Reactor sink connectors.
config.providers=env
config.providers.env.class=io.lenses.connect.secrets.providers.ENVSecretProvider
config.providers.env.param.file.dir=my-secret-dirname=my-sink
class=my-class
topics=mytopic
username=${env::MY_ENV_VAR_USERNAME}
password=${env::MY_ENV_VAR_PASSWORD}name=my-sink
class=my-class
topics=mytopic
username=lenses
password=my-secret-passwordio.lenses.streamreactor.connect.converters.source.AvroConverterio.lenses.streamreactor.connect.converters.source.JsonPassThroughConverter io.lenses.streamreactor.connect.converters.source.JsonSimpleConverterio.lenses.streamreactor.connect.converters.source.BytesConverterformat.from.pattern=yyyy-MM-dd'T'HH:mm:ss,SSSformat.from.pattern="yyyy-MM-dd'T'HH:mm:ss,SSS"format.from.pattern=yyyyMMddHHmmssSSS,"yyyy-MM-dd'T'HH:mm:ss,SSS"yyyy-MM-dd HH:mm:ss.SSS.io.lenses.connect.smt.header.InsertWallclocktransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.InsertWallclock.header.name=wallclock
transforms.InsertWallclock.value.type=epochtransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.InsertWallclock.header.name=wallclock
transforms.InsertWallclock.value.type=format
transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSStransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.InsertWallclock.header.name=wallclock
transforms.InsertWallclock.value.type=format
transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertWallclock.timezone=Asia/Kolkataio.lenses.connect.smt.header.InsertWallclockDateTimeParttransforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=yeartransforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=monthtransforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=daytransforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=hourtransforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=hour
transforms.InsertWallclockDateTimePart.timezone=Asia/Kolkatatransforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=minutetransforms=InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart
transforms.InsertWallclockDateTimePart.header.name=wallclock
transforms.InsertWallclockDateTimePart.date.time.part=secondThis page details the configuration options for the Stream Reactor Kafka Connect sink connectors.
SMT that inserts the system clock value as a message header, a value adapted to a specified time window boundary, for example every 15 minutes, or one hour.
The InsertSourcePartitionOrOffsetValue transformation in Kafka Connect allows you to insert headers into SourceRecords based on partition or offset values.
Examples for AWS S3 Source Kafka Connector.
name=aws-s3SourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089
connect.s3.aws.region=eu-west-2
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=CredentialsExamples for GCP Source Kafka Connector.
name=gcp-storageSourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=1
connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_IDSMT that inserts date, year, month, day, hour, minute and second headers using the record timestamp. If the record timestamp is null, the SMT uses the current system time.
A Kafka Connect Single Message Transform (SMT) that inserts date, year, month,day, hour, minute and second headers using the system clock as a message header.














initialisation-vectorpath,line,tsconnect.s3.aws.secret.key is the secret key to authenticate into the AWS S3 bucket.


connect.s3.error.policy=THROW) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.connect.gcpstorage.error.policy=THROW) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnectorname=AzureEventHubsSourceConnector
connector.class=io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector
tasks.max=1
connect.eventhubs.kcql=INSERT INTO azureoutput SELECT * FROM inputhub;
connect.eventhubs.source.connection.settings.bootstrap.servers=MYNAMESPACE.servicebus.windows.net:9093
connect.eventhubs.source.connection.settings.sasl.mechanism=PLAIN
connect.eventhubs.source.connection.settings.security.protocol=SASL_SSL
connect.eventhubs.source.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING;EntityPath=inputhub";INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-event-hub>;connect.eventhubs.connection.settings.bootstrap.servers=NAMESPACENAME.servicebus.windows.net:9093
connect.eventhubs.connection.settings.sasl.mechanism=PLAIN
connect.eventhubs.connection.settings.security.protocol=SASL_SSL
connect.eventhubs.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";connect.eventhubs.connection.settings.max.poll.records = 100INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicAconfig.providers=aes256
config.providers.aes256.class=io.lenses.connect.secrets.providers.Aes256DecodingProvider
config.providers.aes256.param.aes256.key=aaaaaaaaaabbbbbbbbbbccccccccccdd
config.providers.aes256.param.file.dir=/tmp/aes256name=my-sink
class=my-class
topics=mytopic
greeting=${aes256::xyxyxy}name=my-sink
class=my-class
topics=mytopic
greeting=helloname=my-sink
class=my-class
topics=mytopic
greeting=${aes256:utf8_file:xyxyxy}name=my-sink
class=my-class
topics=mytopic
greeting=/store-root/secrets/abc-def-ghitransforms=InsertRollingWallclock
transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock
transforms.InsertRollingWallclock.header.name=wallclock
transforms.InsertRollingWallclock.value.type=epoch
transforms.InsertRollingWallclock.rolling.window.type=minutes
transforms.InsertRollingWallclock.rolling.window.size=15transforms=InsertRollingWallclock
transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock
transforms.InsertRollingWallclock.header.name=wallclock
transforms.InsertRollingWallclock.value.type=format
transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertRollingWallclock.rolling.window.type=minutes
transforms.InsertRollingWallclock.rolling.window.size=15transforms=InsertRollingWallclock
transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock
transforms.InsertRollingWallclock.header.name=wallclock
transforms.InsertRollingWallclock.value.type=format
transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.InsertRollingWallclock.rolling.window.type=minutes
transforms.InsertRollingWallclock.rolling.window.size=15
transforms.InsertRollingWallclock.timezone=Asia/Kolkatatransforms=InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts
transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefixtransforms=InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts
transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefixtransforms=InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue
transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts
transforms.InsertSourcePartitionOrOffsetValue.offset.prefix=offset.
transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefix
transforms.InsertSourcePartitionOrOffsetValue.partition.prefix=partition.# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=3
name=sink-s3-orders-stream-reactor
topics=orders
# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=1
name=sink-s3-orders-stream-reactor
topics=orders# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=4
connect.s3.error.policy=THROW
connect.s3.partition.search.recurse.levels=0
connect.partition.search.continuous=true
connect.s3.source.partition.extractor.type=hierarchical
connect.s3.kcql=insert into food-restored select * from YOUR_BUCKET:YOUR_PREFIX BATCH=2000 STOREAS `JSON` LIMIT 10000 PROPERTIES('store.envelope'=true)
name=aws-s3SourceEnvelope
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.s3.aws.region=eu-west-2
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=Credentialsname=aws-s3AvroSourceEnvelope
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.error.policy=THROW
connect.s3.partition.search.recurse.levels=0
connect.partition.search.continuous=true
connect.s3.kcql=insert into car_speed_events_replicated select * from YOUR_BUCKET:YOUR_PREFIX STOREAS `AVRO` PROPERTIES('store.envelope' = true)
errors.log.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.s3.aws.region=eu-west-2
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=Credentialsconnector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=4
connect.gcpstorage.error.policy=THROW
connect.gcpstorage.partition.search.recurse.levels=0
connect.partition.search.continuous=true
connect.gcpstorage.source.partition.extractor.type=hierarchical
connect.gcpstorage.kcql=insert into food-restored select * from YOUR_BUCKET:YOUR_PREFIX BATCH=2000 STOREAS `JSON` LIMIT 10000 PROPERTIES('store.envelope'=true)
name=gcp-storageSourceEnvelope
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_IDname=gcp-storageAvroSourceEnvelope
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=1
connect.gcpstorage.error.policy=THROW
connect.gcpstorage.partition.search.recurse.levels=0
connect.partition.search.continuous=true
connect.gcpstorage.kcql=insert into car_speed_events_replicated select * from YOUR_BUCKET:YOUR_PREFIX STOREAS `AVRO` PROPERTIES('store.envelope' = true)
errors.log.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS Xtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
transforms.partition.hour.format="'hour='HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hourtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.daytransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.daytransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="yyyy"
transforms.partition.month.format="MM"
transforms.partition.day.format="dd"
transforms.partition.hour.format="HH"
transforms.partition.minute.format="mm"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minutetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date='yyyy-MM-dd"
transforms.partition.hour.format="'time='HHmm"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.timetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hourtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'dt='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hourtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation-ts='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyyMMddHH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'created_at='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation_ds='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X{
"yaml.schemas": {
"file:///path/to/your/schema.json": ["*.yaml", "*.yml"]
},
"json.schemas": [
{
"fileMatch": ["*.json"],
"url": "file:///path/to/your/schema.json"
}
],
"yaml.format.enable": true,
"yaml.validate": true,
"yaml.hover": true,
"yaml.completion": true,
"yaml.format.singleQuote": false,
"yaml.format.bracketSpacing": true,
"yaml.format.proseWrap": "preserve"
}{
"yaml.schemas": {
"file://${workspaceFolder}/schema.json": ["*.yaml", "*.yml"]
},
"json.schemas": [
{
"fileMatch": ["*.json"],
"url": "file://${workspaceFolder}/schema.json"
}
]
}io.lenses.streamreactor.connect.influx.InfluxSinkConnectorname=influxdb
connector.class=io.lenses.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)config.providers=azure
config.providers.azure.class=io.lenses.connect.secrets.providers.AzureSecretProvider
config.providers.azure.param.azure.auth.method=credentials
config.providers.azure.param.azure.client.id=your-client-id
config.providers.azure.param.azure.secret.id=your-secret-id
config.providers.azure.param.azure.tenant.id=your-tenant-id
config.providers.azure.param.file.dir=/connector-files/azurename=my-sink
class=my-class
topics=mytopic
username=${azure:my-azure-key-vault.vault.azure.net:my_username}
password=${azure:my-azure-key-vault.vault.azure.net:my_password}name=my-sink
class=my-class
topics=mytopic
username=lenses
password=my-secret-passwordconnect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hourio.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.InsertWallclock.header.prefix.name=wallclock_transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.InsertWallclock.date.format=yyyy-MM-ddtransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.InsertWallclock.timezone=Asia/Kolkatatransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.InsertWallclock.date.format="date=yyyy-MM-dd"
transforms.InsertWallclock.hour.format="hour=yyyy"connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.yearconnect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hourio.lenses.connect.smt.header.InsertWallclockHeaderstransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaderstransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.header.prefix.name=wallclock_transforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.date.format=yyyy-MM-ddtransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.timezone=Asia/Kolkatatransforms=InsertWallclock
transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclockHeaders
transforms.InsertWallclock.date.format="date=yyyy-MM-dd"
transforms.InsertWallclock.hour.format="hour=HH"connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY date, yearThis page describes the usage of the Stream Reactor Azure Service Bus Sink Connector.
io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnectorExamples for GCP Sink Kafka Connector time based partitioning.
Inserts the datetime as a message header from a value field.
Inserts date, year, month, day, hour, minute and second headers using the record timestamp and a rolling time window configuration. If the record timestamp is null, the SMT uses the system time.
Kafka SMT that inserts date, year, month, day, hour, minute and second headers using the system timestamp and a rolling time window configuration.
SMT that allows the user to specify the format of the timestamp inserted as a header. It also avoids the synchronization block requirement for converting to a string representation of the timestamp.
connector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector
name=AzureEventHubsSinkConnector
tasks.max=1
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');INSERT INTO <your-service-bus>
SELECT *
FROM <your-kafka-topic>
PROPERTIES(...); connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS Xtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hourtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.daytransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.daytransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="yyyy"
transforms.partition.month.format="MM"
transforms.partition.day.format="dd"
transforms.partition.hour.format="HH"
transforms.partition.minute.format="mm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minutetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date='yyyy-MM-dd"
transforms.partition.hour.format="'time='HHmm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.timetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hourtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'dt='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hourtransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation-ts='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyyMMddHH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'created_at='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation_ds='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.datetransforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 821122,
"partition": 3,
"timestamp": 1695645345,
"topic": "source_topic"
}
}INSERT INTO lensesioaws
SELECT * FROM payments
STOREAS AVRO
PROPERTIES (
‘store.envelope’=true
);connect.s3.kcql=INSERT INTO lensesioaws SELECT * FROM payments STOREAS AVRO PROPERTIES ( ‘store.envelope’=true)s3://lensesioaws/payments/0/000000001234.avro
s3://lensesioaws/payments/1/00000000555.avro
s3://lensesioaws/payments/2/0000000014566.avroINSERT INTO my-s3-bucket
SELECT *
FROM sales
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO,s3://my-s3-bucket/2023/Electronics/EMEA/000000001.avroINSERT INTO my-s3-bucket
SELECT *
FROM sales
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO
PROPERTIES('partition.include.keys' = true)s3://my-s3-bucket/year=2023/product_category=Electronics/region=EMEA/000000001.avroconnect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` WITH_FLUSH_SIZE=1000000 WITH_FLUSH_INTERVAL=30 WITH_FLUSH_COUNT=5000
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HHio.lenses.streamreactor.connect.ftp.source.FtpSourceConnectorname=ftp-source
connector.class=io.lenses.streamreactor.connect.ftp.source.FtpSourceConnector
tasks.max=1
#server settings
connect.ftp.address=localhost:21
connect.ftp.user=ftp
connect.ftp.password=ftp
#refresh rate, every minute
connect.ftp.refresh=PT1M
#ignore files older than 14 days.
connect.ftp.file.maxage=P14D
#monitor /forecasts/weather/ and /logs/ for appends to files.
#any updates go to the topics `weather` and `error-logs` respectively.
connect.ftp.monitor.tail=/forecasts/weather/:weather,/logs/:error-logs
#keep an eye on /statuses/, files are retrieved as a whole and sent to topic `status`
connect.ftp.monitor.update=/statuses/:status
#keystyle controls the format of the key and can be string or struct.
#string only provides the file name
#struct provides a structure with the filename and offset
connect.ftp.keystyle=structpackage io.lenses.streamreactor.connect.ftp
trait SourceRecordConverter extends Configurable {
def convert(in:SourceRecord) : java.util.List[SourceRecord]
}
class NopSourceRecordConverter extends SourceRecordConverter{
override def configure(props: util.Map[String, _]): Unit = {}
override def convert(in: SourceRecord): util.List[SourceRecord] = Seq(in).asJava
}connect.ftp.sourcerecordconverter=your.name.space.YourConverterconnect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hourio.lenses.connect.smt.header.InsertFieldTimestampHeaderstransforms=fieldTs
field=_value.created_at
transforms.fieldTs.type=io.lenses.connect.smt.header.InsertFieldTimestampHeaderstransforms=fieldTs
field=_key.created_at
transforms.fieldTs.type=io.lenses.connect.smt.header.InsertFieldTimestampHeaderstransforms=fieldTs
field=created_at
transforms.fieldTs.type=io.lenses.connect.smt.header.InsertFieldTimestampHeaders
transforms.fieldTs.header.prefix.name=wallclock_transforms=fieldTs
field=created_at
transforms.fieldTs.type=io.lenses.connect.smt.header.InsertFieldTimestampHeader
transforms.fieldTs.date.format=yyyy-MM-ddtransforms=fieldTs
field=created_at
transforms.fieldTs.type=io.lenses.connect.smt.header.InsertFieldTimestampHeader
transforms.fieldTs.timezone=Asia/Kolkatatransforms=fieldTs
field=created_at
transforms.fieldTs.type=io.lenses.connect.smt.header.InsertFieldTimestampHeader
transforms.fieldTs.date.format="date=yyyy-MM-dd"
transforms.fieldTs.hour.format="hour=yyyy"connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.yearformat.from.pattern=yyyyMMddHHmmssSSS,"yyyy-MM-dd'T'HH:mm:ss,SSS"connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hourio.lenses.connect.smt.header.InsertRollingRecordTimestampHeaderstransforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.date.format="date=yyyy-MM-dd"transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkatatransforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingRecordTimestampHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkata
transforms.rollingWindow.date.format="date=yyyy-MM-dd"
transforms.rollingWindow.hour.format="hour=yyyy"connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.yearconnect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hourio.lenses.connect.smt.header.InsertRollingWallclockHeaderstransforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.date.format="date=yyyy-MM-dd"transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkatatransforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingWallclockHeaders
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkata
transforms.rollingWindow.date.format="date=yyyy-MM-dd"
transforms.rollingWindow.hour.format="hour=yyyy"connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.yeario.lenses.connect.smt.header.TimestampConvertertransforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_value.ts
transforms.TimestampConverter.target.type=string
transforms.TimestampConverter.format.from.pattern=yyyyMMddHHmmssSSS
transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd HH:mm:ss.SSStransforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_value.ts
transforms.TimestampConverter.target.type=string
transforms.TimestampConverter.format.from.pattern=yyyyMMddHHmmssSSS
transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd-HH
transforms.TimestampConverter.rolling.window.type=hours
transforms.TimestampConverter.rolling.window.size=1transforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_value.ts
transforms.TimestampConverter.target.type=string
transforms.TimestampConverter.format.from.pattern=yyyyMMddHHmmssSSS
transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd-HH
transforms.TimestampConverter.rolling.window.type=hours
transforms.TimestampConverter.rolling.window.size=1
transforms.TimestampConverter.timezone=Asia/Kolkatatransforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_value.ts
transforms.TimestampConverter.target.type=string
transforms.TimestampConverter.format.from.pattern=yyyyMMddHHmmssSSS
transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd-HH-mm
transforms.TimestampConverter.rolling.window.type=minutes
transforms.TimestampConverter.rolling.window.size=15transforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_key.ts
transforms.TimestampConverter.target.type=unix
transforms.TimestampConverter.unix.precision=millisecondstransforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_timestamp
transforms.TimestampConverter.target.type=unix
transforms.TimestampConverter.unix.precision=millisecondsformat.from.pattern=yyyyMMddHHmmssSSS,"yyyy-MM-dd'T'HH:mm:ss,SSS"This page describes the usage of the Stream Reactor MQTT Source Connector.
name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.client.id=dm_source_id
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.service.quality=1INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-mqtt-topic>
[WITHCONVERTER=`myclass`]-- Insert mode, select all fields from topicA
-- and write to topic topic with converter myclass
INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]
-- wildcard
INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']-- wildcard
INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA/+/sensors WITHCONVERTER=`myclass`INSERT INTO `$` SELECT * FROM /mqttTopicA/+/sensorsconfig.providers=aws
config.providers.aws.class=io.lenses.connect.secrets.providers.AWSSecretProvider
config.providers.aws.param.aws.auth.method=credentials
config.providers.aws.param.aws.access.key=your-client-key
config.providers.aws.param.aws.secret.key=your-secret-key
config.providers.aws.param.aws.region=your-region
config.providers.aws.param.file.dir=/connector-files/awsname=my-sink
class=my-class
topics=mytopic
username=${aws:my-aws-secret:my_username_key}
password=${aws:my-aws-secret:my_password_key}name=my-sink
class=my-class
topics=mytopic
username=lenses
password=my-secret-password{
"username": "xxx",
"password": "xxx",
"engine": "postgres",
"host": "xxx",
"port": 5432,
"dbname": "xxx",
"dbInstanceIdentifier": "xxxx"
}connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._date, _header._hour
connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header._year, _header._month, _header._day, _header._hourio.lenses.connect.smt.header.InsertRollingFieldTimestampHeaderstransforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingFieldTimestampHeaders
transforms.rollingWindow.field=created_at
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingFieldTimestampHeaders
transforms.rollingWindow.field=created_at
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingFieldTimestampHeaders
transforms.rollingWindow.field=created_at
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.date.format="date=yyyy-MM-dd"transforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingFieldTimestampHeaders
transforms.rollingWindow.field=created_at
transforms.rollingWindow.header.prefix.name=wallclock_
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkatatransforms=rollingWindow
transforms.rollingWindow.type=io.lenses.connect.smt.header.InsertRollingFieldTimestampHeaders
transforms.rollingWindow.field=created_at
transforms.rollingWindow.rolling.window.type=minutes
transforms.rollingWindow.rolling.window.size=15
transforms.rollingWindow.timezone=Asia/Kolkata
transforms.rollingWindow.date.format="date=yyyy-MM-dd"
transforms.rollingWindow.hour.format="hour=yyyy"connect.s3.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.yearformat.from.pattern=yyyyMMddHHmmssSSS,"yyyy-MM-dd'T'HH:mm:ss,SSS"This page describes the usage of the Stream Reactor Azure Service Bus Source Connector.
io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnectorThis page describes the usage of the Stream Reactor Google PubSub Source Connector.
io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnectorThis page describes the usage of the Stream Reactor Redis Sink Connector.
connector.class=io.lenses.streamreactor.connect.azure.servicebus.source.AzureServiceBusSourceConnector
name=AzureServiceBusSourceConnector
tasks.max=1
connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING";
connect.servicebus.kcql=INSERT INTO output-topic SELECT * FROM servicebus-queue PROPERTIES('servicebus.type'='QUEUE');INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-service-bus>
PROPERTIES(...); connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-queue PROPERTIES('servicebus.type'='QUEUE');connect.servicebus.kcql=INSERT INTO kafka-topic SELECT * FROM azure-topic PROPERTIES('servicebus.type'='TOPIC','subscription.name'='subscription1');io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnectorname=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=cassandraINSERT INTO <your-cassandra-table>
SELECT FIELD,...
FROM <your-table>
[TTL=Time to live]-- Insert mode, select all fields from topicA and
-- write to tableA
INSERT INTO tableA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB
INSERT INTO tableB SELECT x AS a, y, c FROM topicB
-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB with TTL
INSERT INTO tableB SELECT x, y FROM topicB TTL=100000DELETE FROM orders WHERE id = ? and product = ?# Message
# "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
# DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"
# connect.cassandra.delete.enabled=true
# connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
# connect.cassandra.delete.struct_flds=id,productio.lenses.streamreactor.connect.redis.sink.RedisSinkConnectorname=redis
connector.class=io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
tasks.max=1
topics=redis
connect.redis.host=redis
connect.redis.port=6379
connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM[INSERT INTO <redis-cache>]
SELECT FIELD, ...
FROM <kafka-topic>
[PK FIELD]
[STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]{ "symbol": "USDGBP" , "price": 0.7943 }
{ "symbol": "EURGBP" , "price": 0.8597 }SELECT price from yahoo-fx PK symbolKey=EURGBP Value={ "price": 0.7943 }INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADDINSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAMSELECT * FROM topic STOREAS PubSub (channel=myfield)name=GcpPubSubSourceDemo
connector.class=io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector
topics=kafka_topic_to_write_to
tasks.max=1
connect.pubsub.gcp.auth.mode=File
connect.pubsub.gcp.file=/path/to/gcp-service-account-key.json
connect.pubsub.gcp.project.id=gcp-project-id
connect.pubsub.kcql=insert into `kafka_topic_to_write_to` select * from `gcp-subscription-id`INSERT INTO kafka-topic
SELECT *
FROM subscriptionId
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefixINSERT INTO my-topic SELECT * FROM subscriptionId;connect.pubsub.gcp.auth.mode=Defaultconnect.pubsub.gcp.auth.mode=Credentials
connect.pubsub.gcp.credentials=$GCP_CREDENTIALS
connect.pubsub.gcp.project.id=$GCP_PROJECT_IDconnect.pubsub.gcp.auth.mode=File
connect.pubsub.gcp.file=/home/secure-stuff/gcp-read-credential.txtconnect.pubsub.output.mode=DEFAULTconnect.pubsub.output.mode=COMPATIBILITYio.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnectorname=mongo
connector.class=io.lenses.streamreactor.connect.mongodb.sink.MongoSinkConnector
tasks.max=1
topics=orders
connect.mongo.kcql=INSERT INTO orders SELECT * FROM orders
connect.mongo.db=connect
connect.mongo.connection=mongodb://mongo:27017INSERT | UPSERT
INTO <collection_name>
SELECT FIELD, ...
FROM <kafka-topic>
BATCH = 100-- Select all fields from topic fx_prices and insert into the fx collection
INSERT INTO fx SELECT * FROM fx_prices
-- Select all fields from topic fx_prices and upsert into the fx collection,
-- The assumption is there will be a ticker field in the incoming json:
UPSERT INTO fx SELECT * FROM fx_prices PK ticker# default of scram
mongodb://host1/?authSource=db1
# scram explict
mongodb://host1/?authSource=db1&authMechanism=SCRAM-SHA-1
# mongo-cr
mongodb://host1/?authSource=db1&authMechanism=MONGODB-CR
# x.509
mongodb://host1/?authSource=db1&authMechanism=MONGODB-X509
# kerberos
mongodb://host1/?authSource=db1&authMechanism=GSSAPI
# ldap
mongodb://host1/?authSource=db1&authMechanism=PLAININSERT INTO lenses-io-demo ...INSERT INTO `<path to field>`
SELECT * FROM control.boxes.test
PROPERTIES('mqtt.target.from.field'='true')io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnectorname=mqtt
connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=orders
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_sink_id
connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM ordersINSERT
INTO <mqtt-topic>
SELECT * //no field projection supported
FROM <kafka-topic>
//no WHERE clause supported-- Insert into /landoop/demo all fields from kafka_topicA
INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA
-- Insert into /landoop/demo all fields from dynamic field
INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')This page describes how to retrieve secrets from Hashicorp Vault for use in Kafka Connect.
INSERT INTO `_key`
SELECT ...INSERT INTO `_topic`
SELECT ...object version {
....
val azureServiceBusVersion = "7.14.7"
...
lazy val azureServiceBus: ModuleID = "com.azure" % "azure-messaging-servicebus" % azureServiceBusVersionval kafkaConnectAzureServiceBusDeps: Seq[ModuleID] = Seq(azureServiceBus)lazy val subProjects: Seq[Project] = Seq(
`query-language`,
common,
`cloud-common`,
`aws-s3`,
`azure-documentdb`,
`azure-datalake`,
`azure-servicebus`,
`azure-storage`,
Cassandra,
elastic6,
elastic7,
ftp,
`gcp-storage`,
influxdb,
jms,
mongodb,
mqtt,
redis,
)
-----
lazy val `azure-servicebus` = (project in file("kafka-connect-azure-servicebus"))
.dependsOn(common)
.settings(
settings ++
Seq(
name := "kafka-connect-azure-servicebus",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectAzureServiceBusDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.enablePlugins(PackPlugin) file.write=trueconfig.providers=vault
config.providers.vault.class=io.lenses.connect.secrets.providers.VaultSecretProvider
config.providers.vault.param.vault.addr=http://localhost:8200
config.providers.vault.param.vault.auth.method=token
config.providers.vault.param.vault.token=my-token
config.providers.vault.param.file.dir=/connector-files/vaultname=my-sink
class=my-class
topics=mytopic
username=${vault:secret/my-vault-secret:my_username_key}
password=${vault:secret/my-vault-secret:my_password_key}name=my-sink
class=my-class
topics=mytopic
username=lenses
password=my-secret-passwordThis page describes the usage of the Stream Reactor Cassandra Source Connector.
This page describes the usage of the Stream Reactor Elasticsearch Sink Connector.
io.lenses.streamreactor.connect.elastic6.ElasticSinkConnectorname=cassandra
connector.class=io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=cassandraINSERT INTO <your-topic>
SELECT FIELD,...
FROM <your-cassandra-table>
[PK FIELD]
[WITHFORMAT JSON]
[INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN|DSESEARCHTIMESTAMP]
[WITHKEY(<your-key-field>)]-- Select all columns from table orders and insert into a topic
-- called orders-topic, use column created to track new rows.
-- Incremental mode set to TIMEUUID
INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
-- Select created, product, price from table orders and insert
-- into a topic called orders-topic, use column created to track new rows.
INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverterINSERT INTO <topic>
SELECT <fields>
FROM <column_family>
PK <PK_field>
WITHFORMAT JSON
WITHUNWRAP INCREMENTALMODE=<mode>
WITHKEY(<key_field>)// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']INSERT INTO <topic>
SELECT a, b, c, d
FROM keyspace.table
WHERE solr_query= 'pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]}'
INCREMENTALMODE=DSESEARCHTIMESTAMPio.lenses.streamreactor.connect.elastic7.ElasticSinkConnectorname=elastic
connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=trueINSERT | UPSERT
INTO <elastic_index >
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHDOCTYPE=<your_document_type>]
[WITHINDEXSUFFIX=<your_suffix>]-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK idINSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}INSERT INTO index_name SELECT * FROM topicAINSERT INTO _header.gate SELECT * FROM topicAINSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicAINSERT INTO _key SELECT * FROM topicAINSERT INTO _value.name SELECT * FROM topicAINSERT INTO _value.name.firstName SELECT * FROM topicAINSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA{
"customer.name": {
"first.name": "hans"
}
}ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS # Can also be PKCS12 if applicable
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS # Can also be PKCS12 if applicable
ssl.protocol=TLSv1.2 # Or TLSv1.3 for stronger security
ssl.trustmanager.algorithm=PKIX # Default algorithm for managing certificates
ssl.keymanager.algorithm=PKIX # Default algorithm for managing certificatesThis page describes the usage of the Stream Reactor Azure CosmosDB Sink Connector.
io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnectorname=cosmosdb
connector.class=io.lenses.streamreactor.connect.azure.cosmosdb.sink.CosmosDbSinkConnector
tasks.max=1
topics=orders-string
connect.cosmosdb.kcql=INSERT INTO orders SELECT * FROM orders-string
connect.cosmosdb.db=dm
connect.cosmosdb.endpoint=[YOUR_AZURE_ENDPOINT]
connect.cosmosdb.db.create=true
connect.cosmosdb.master.key=[YOUR_MASTER_KEY]
connect.cosmosdb.batch.size=10INSERT | UPSERT
INTO <your-collection>
SELECT FIELD, ...
FROM kafka_topic
[PK FIELDS,...]-- Insert mode, select all fields from topicA
-- and write to tableA
INSERT INTO collectionA SELECT * FROM topicA
-- UPSERT mode, select 3 fields and
-- rename from topicB and write to tableB
-- with primary key as the field id from the topic
UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK idconnect.cosmosdb.bulk.enabled=trueconnect.cosmosdb.collection.throughput=400connect.cosmosdb.proxy=<proxy-uri>connect.progress.enabled=trueINSERT INTO myCollection SELECT * FROM myTopic PROPERTIES('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)name=jms
connector.class=io.lenses.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSONINSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE
-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPICThis page describes the usage of the Stream Reactor HTTP Sink Connector.
io.lenses.streamreactor.connect.http.sink.HttpSinkConnectorname=lenseshttp
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
tasks.max=1
topics=topicToRead
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.authentication.type=none
connect.http.method=POST
connect.http.endpoint=http://endpoint.local/receive
connect.http.request.content="My Static Content Template"
connect.http.batch.count=1connect.http.request.content="My Static Content Template"connect.http.request.content="<product><id>{{value.name}}</id></product>" <messages>
{{#message}}
<message>
<topic>{{topic}}</topic>
<employee>{{value.employeeId}}</employee>
<order>{{value.orderNo}}</order>
<groupDomain>{{value.groupDomain}}</groupDomain>
</message>
{{/message}}
</messages>connect.http.request.content="<messages>{{#message}}<message><topic>{{topic}}</topic><employee>{{value.employeeId}}</employee><order>{{value.orderNo}}</order><groupDomain>{{value.groupDomain}}</groupDomain></message>{{/message}}</messages>" <messages>
<message>
<topic>myTopic</topic>
<employee>Abcd1234</employee>
<order>10</order>
<groupDomain>myExampleGroup.uk</groupDomain>
</message>
<message>
<topic>myTopic</topic>
<employee>Efgh5678</employee>
<order>11</order>
<groupDomain>myExampleGroup.uk</groupDomain>
</message>
</messages>connect.http.authentication.type=noneconnect.http.authentication.type=basic
connect.http.authentication.basic.username=user
connect.http.authentication.basic.password=passwordconnect.http.authentication.type=oauth2
connect.http.authentication.oauth2.token.url=http://myoauth2.local/getToken
connect.http.authentication.oauth2.client.id=clientId
connect.http.authentication.oauth2.client.secret=client-secret
connect.http.authentication.oauth2.token.property=access_token
connect.http.authentication.oauth2.client.scope=any
connect.http.authentication.oauth2.client.headers=header:valueconnect.http.request.headers="Content-Type","text/plain","X-User","{{header.kafkauser}}","Product","{{value.product.id}}"connect.http.batch.count=50000
connect.http.batch.size=500000000
connect.http.time.interval=3600connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="My Static Content Template"
connect.http.batch.count=1connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="{{value}}"
connect.http.batch.count=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverterconnector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="product: {{value.product}}"
connect.http.batch.size=1
value.converter.schemas.enable=falseconnector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="whole product message: {{value}}"
connect.http.time.interval=5connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
topics=mytopic
tasks.max=1
connect.http.method=POST
connect.http.endpoint="https://my-endpoint.example.com"
connect.http.request.content="product: {{value.product}}"
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://schema-registry:8081connect.reporting.error.config.enabled=true
connect.reporting.success.config.enabled=trueconnect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=localhost:9094
connect.reporting.error.config.topic=http-monitoringconnect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=my-kafka-cluster.com:9093
connect.reporting.error.config.security.protocol=SASL_SSL
connect.reporting.error.config.sasl.mechanism=PLAIN
connect.reporting.error.config.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="MYUSER" password="MYPASSWORD";connect.reporting.error.config.enabled=true
connect.reporting.error.config.bootstrap.servers=SSL://my-ssl-protected-cluster:9094
connect.reporting.error.config.security.protocol=SSL
connect.reporting.error.config.ssl.keystore.location=/path/to/my/keystore.p12
connect.reporting.error.config.ssl.keystore.type=PKCS12
connect.reporting.error.config.ssl.truststore.location=/path/to/my/truststore.p12
connect.reporting.error.config.ssl.truststore.password=************
connect.reporting.error.config.ssl.truststore.type=PKCS12
connect.reporting.error.config.topic=http-error-topicThis page describes the usage of the Stream Reactor GCP Big Query Sink Connector.
Invalid field name
"com.examples.project-super-important.v1.MyData". Fields must
contain only letters, numbers, and underscores, start with a letter or
underscore, and be at most 300 characters long.name = kcbq-connect1
connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max = 1
topics = quickstart
sanitizeTopics = true
autoCreateTables = true
allowNewBigQueryFields = true
allowBigQueryRequiredFieldRelaxation = true
schemaRetriever = com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
project = lenses-123
defaultDataset = ConfluentDataSet
keyfile = <path to json file>
transforms = RegexTransformation
transforms.RegexTransformation.type = org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexTransformation.regex = (kcbq_)(.*)
transforms.RegexTransformation.replacement = $2name=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
autoCreateTables=truename=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
enableBatchLoad=orders,customers
gcsBucketName=my-gcs-bucket
autoCreateBucket=truename=bigquery-sink
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
topics=orders,customers
project=my-gcp-project
defaultDataset=kafka_data
keyfile=/path/to/keyfile.json
upsertEnabled=true
mergeIntervalMs=30000
mergeRecordsThreshold=1000log4j.logger.com.wepay.kafka.connect.bigquery=DEBUGINSERT INTO `my-topic`
SELECT * FROM `my-s3-bucket:my-prefix`
PROPERTIES (
'post.process.action'=`DELETE`
)io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnectorname=aws-s3SourceConnectorParquet
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.s3.aws.region=eu-west-2
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=CredentialsINSERT INTO $kafka-topic
SELECT *
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefixFROM [bucketname]:pathprefix
//my-bucket-called-pears:my-folder-called-apples INSERT INTO my-apples-topic SELECT * FROM my-bucket-called-pears:my-folder-called-apples STOREAS `JSON`
STOREAS `Avro`
STOREAS `Parquet`
STOREAS `Text`
STOREAS `CSV`
STOREAS `CSV_WithHeaders`
STOREAS `Bytes`connect.s3.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')connect.s3.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')BATCH = 100LIMIT 10000connect.s3.source.extension.excludes=txt,csvconnect.s3.source.extension.includes=json,xmlconnect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=$AWS_ACCESS_KEY
connect.s3.aws.secret.key=$AWS_SECRET_KEYlistObjectsV2
listObjectsV2Pagbinator
putObject
getObject
headObject
deleteObjects
deleteObjectINSERT INTO `my-topic`
SELECT * FROM `my-gcp-storage-bucket:my-prefix`
PROPERTIES (
'post.process.action'=`DELETE`
)io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnectorname=gcp-storageSourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=1
connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_IDINSERT INTO $kafka-topic
SELECT *
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefixFROM [bucketname]:pathprefix
//my-bucket-called-pears:my-folder-called-apples INSERT INTO my-apples-topic SELECT * FROM my-bucket-called-pears:my-folder-called-apples STOREAS `JSON`
STOREAS `Avro`
STOREAS `Parquet`
STOREAS `Text`
STOREAS `CSV`
STOREAS `CSV_WithHeaders`
STOREAS `Bytes`connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true') connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')BATCH = 100LIMIT 10000connect.gcpstorage.source.extension.excludes=txt,csvconnect.gcpstorage.source.extension.includes=json,xmlconnect.gcpstorage.gcp.auth.mode=Defaultconnect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_IDconnect.gcpstorage.gcp.auth.mode=File
connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-read-credential.txt{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Value, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$INSERT INTO `my-topic`
SELECT * FROM `my-s3-bucket:my-prefix`
PROPERTIES (
'post.process.action'=`MOVE`,
'post.process.action.bucket'=`archive-bucket`,
'post.process.action.prefix'=`processed/`
)INSERT INTO `my-topic`
SELECT * FROM `my-gcp-storage-bucket:my-prefix`
PROPERTIES (
'post.process.action'=`MOVE`,
'post.process.action.bucket'=`archive-bucket`,
'post.process.action.prefix'=`processed/`
)This page describes the usage of the Stream Reactor Azure Datalake Gen 2 Sink Connector.
io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnectorThis page describes the usage of the Stream Reactor GCP Storage Sink Connector.


connector.class=io.lenses.streamreactor.connect.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HHINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.datalake.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $container[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.azure.datalake.sink.DatalakeSinkConnector
connect.datalake.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
... ...
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.datalake.kcql=INSERT INTO lensesioazure:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.datalake.azure.auth.mode=Credentials
connect.datalake.azure.account.name=$AZURE_ACCOUNT_NAME
connect.datalake.azure.account.key=$AZURE_ACCOUNT_KEY
......
connect.datalake.azure.auth.mode=ConnectionString
connect.datalake.azure.connection.string=$AZURE_CONNECTION_STRING
...io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnectorconnector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HHINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.gcpstorage.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $container[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
......
connect.gcpstorage.gcp.auth.mode=File
connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-write-credential.txt
...This page describes the usage of the Stream Reactor Cassandra Sink Connector bundled in kafka-connect-cassandra-sink artifact.
io.lenses.streamreactor.connect.cassandra.CassandraSinkConnectorname=cassandra-sink
connector.class=io.lenses.streamreactor.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO mykeyspace.orders SELECT _key.bigint as bigintcol, _value.boolean as booleancol, _key.double as doublecol, _value.float as floatcol, _key.int as intcol, _value.smallint as smallintcol, _key.text as textcol, _value.tinyint as tinyintcol FROM orders
connect.cassandra.port=9042
connect.cassandra.contact.points=cassandraconnect.cassandra.contact.points=[host list]
connnect.cassandra.port=9042
connect.cassandra.load.balancing.local.dc=datacentre-name
# optional entries
# connect.cassandra.max.concurrent.requests = 100
# connect.cassandra.max.batch.size = 64
# connect.cassandra.connection.pool.size = 4
# connect.cassandra.query.timeout.ms = 30
# connect.cassandra.compression = Noneconnect.cassandra.ssl.provider=
connect.cassandra.ssl.cipher.suites=
connect.cassandra.ssl.hostname.verification=true
connect.cassandra.ssl.keystore.path=
connect.cassandra.ssl.keystore.password=
connect.cassandra.ssl.truststore.password=
connect.cassandra.ssl.truststore.path=
# Path to the SSL certificate file, when using OpenSSL.
connect.cassandra.ssl.openssl.key.cert.chain=
# Path to the private key file, when using OpenSSL.
connect.cassandra.ssl.openssl.private.key=connect.cassandra.auth.provider=
connect.cassandra.auth.username=
connect.cassandra.auth.password=
# for SASL authentication which requires connect.cassandra.auth.provider
# set to GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=connect.cassandra.auth.provider=GSSAPI
connect.cassandra.auth.gssapi.keytab=
connect.cassandra.auth.gssapi.principal=
connect.cassandra.auth.gssapi.service=INSERT INTO <keyspace><your-cassandra-table>
SELECT <field projection,...
FROM <your-table>
PROPERTIES(< a set of keys to control the connector behaviour>)INSERT INTO mykeyspace.types
SELECT
_key.bigint as bigintcol
, _value.boolean as booleancol
, _key.double as doublecol
, _value.float as floatcol
, _header.int as intcol
FROM myTopic
INSERT INTO mykespace.types
SELECT
_value.bigint as bigintcol
, _value.double as doublecol
, _value.ttlcol as message_internal_ttl
, _value.timestampcol as message_internal_timestamp
FROM myTopic
PROPERTIES('ttlTimeUnit'='MILLISECONDS', 'timestampTimeUnit'='MICROSECONDS')
INSERT INTO mykeyspace.CASE_SENSITIVE
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM mytopic
INSERT INTO mykeyspace.tableA
SELECT
key.my_pk as my_pk
, _value.my_value as my_value
FROM topicA
PROPERTIES(
'query'='INSERT INTO mykeyspace.pk_value (my_pk, my_value) VALUES (:my_pk, :my_value)',
'deletesEnabled' ='true'
)INSERT INTO ${target}
SELECT _value/_key/_header.{field_path} AS ${table_column}
FROM mytopicINSERT INTO ${target}
SELECT
_key as row_key
, _value as contentINSERT INTO ${target}
SELECT
`_key.'bigint field'` as 'bigint col',
`_key.'boolean-field'` as 'boolean-col',
`_value.'INT FIELD'` as 'INT COL',
`_value.'TEXT.FIELD'` as 'TEXT.COL'
FROM myTopicINSERT INTO stocks_keyspace.stocks_by_symbol
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;
INSERT INTO stocks_keyspace.stocks_by_exchange
SELECT _value.symbol AS ticker,
_value.ts AS ts,
_value.exchange AS exchange,
_value.value AS value
FROM stocks;{"symbol":"APPL",
"value":214.4,
"exchange":"NASDAQ",
"ts":"2025-07-23T14:56:10.009"}CREATE TYPE stocks_keyspace.stocks_type (
symbol text,
ts timestamp,
exchange text,
value double
);
CREATE TABLE stocks_keyspace.stocks_table (
name text PRIMARY KEY,
stocks FROZEN<stocks_type>
);INSERT INTO stocks_keyspace.stocks_table
SELECT _key AS name, _value AS stocks
FROM stocksINSERT INTO ${target}
SELECT
`now()` as loaded_at
FROM myTopicSELECT [_value/_key/_header].{path} AS message_internal_timestamp
FROM myTopic
--You may optionally specify the time unit by using:
PROPERTIES('timestampTimeUnit'='MICROSECONDS')SELECT _header.timestampcolumn AS message_internal_timestamp FROM myTopic
SELECT _value.timestampcol AS message_internal_timestamp FROM myTopic
SELECT _key.timestampcol AS message_internal_timestamp FROM myTopicSELECT [_value/_key/_header].{path} as message_internal_ttl
FROM myTopic
-- Optional: Specify the TTL unit
PROPERTIES('ttlTimeUnit'='SECONDS')SELECT _header.ttl as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='MINUTES')
SELECT _key.expiry as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='HOURS')
SELECT _value.ttl_duration as message_internal_ttl
FROM myTopic
PROPERTIES('ttlTimeUnit'='DAYS')INSERT INTO ... SELECT ... FROM myTopic PROPERTIES(
'codec.locale' = 'en_US',
'codec.timeZone' = 'UTC',
'codec.timestamp' = 'CQL_TIMESTAMP',
'codec.date' = 'ISO_LOCAL_DATE',
'codec.time' = 'ISO_LOCAL_TIME',
'codec.unit' = 'MILLISECONDS'
)INSERT INTO ${target}
SELECT
_value.bigint as some_name,
_value.int as some_name2
FROM myTopic
PROPERTIES('query'='INSERT INTO %s.types (bigintCol, intCol) VALUES (:some_name, :some_name_2)')connect.cassandra.driver.{driver_setting}connect.cassandra.driver.*connect.cassandra.driver.basic.request.consistency=ALLThis page describes the usage of the Stream Reactor AWS S3 Sink Connector.
io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.ts STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demoINSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]{
...
"a.b": "value",
...
}INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testbucket SELECT * FROM topicA;
INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;topics.regex = ^sensor_data_\d+$
connect.s3.kcql= INSERT INTO $target SELECT * FROM `*` ....PARTITIONBY fieldA, fieldBPARTITIONBY _keyPARTITIONBY _key.fieldA, _key.fieldBPARTITIONBY _header.<header_key1>[, _header.<header_key2>]PARTITIONBY fieldA, _key.fieldB, _headers.fieldCINSERT INTO $bucket[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
'partition.include.keys'=true,
)connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 0,
"partition": 0,
"timestamp": 0,
"topic": "topic"
}
}...
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET`
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
......
connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...pgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)...
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=$AWS_ACCESS_KEY
connect.s3.aws.secret.key=$AWS_SECRET_KEY
...listObjectsV2
listObjectsV2Pagbinator
putObject
getObject
headObject
deleteObjects
deleteObjectThis page contains the release notes for the Stream Reactor.
STOREAS Bytes_*** is usedconnect.s3.source.partition.search.continuousconnect.hive.metastore.urisINSERT INTO topic SELECT * FROM bucket:prefix
STOREAS `JSON`
PROPERTIES(
'post.process.action'='move',
'post.process.action.bucket'='target-bucket',
'post.process.action.prefix'='processed',
'post.process.action.watermark.process.late.arrival'='true'
)InvalidHeaderValue: The value for one of the HTTP headers is not in the correct format.
x-ms-range-get-content-md5: truepgsqlCopyEditmessage1 -> schema1
message2 -> schema1
(No flush needed – same schema)
message3 -> schema2
(Flush occurs – new schema introduced)
message4 -> schema2
(No flush needed – same schema)
message5 -> schema1
Without optimization: would trigger a flush
With optimization: no flush – schema1 is backward-compatible with schema2
message6 -> schema2
message7 -> schema2
(No flush needed – same schema, it would happen based on the flush thresholds)java.util.ServiceConfigurationError:
io.confluent.kafka.schemaregistry.rules.RuleExecutor:
io.confluent.kafka.schemaregistry.encryption.FieldEncryptionExecutor not a subtypeHttpWriterManager has no writers. Perhaps no records have been put to the sink yet.INSERT INTO ..... PROPERTIES('key.suffix'='unique-id1')...connect.http.retry.mode=fixed
connect.http.retry.fixed.interval.ms=10000
connect.http.retries.max.retries=20INSERT INTO `my-bucket`
SELECT * FROM `my-topic`
PROPERTIES ('post.process.action'=`DELETE`)INSERT INTO `my-bucket:archive/`
SELECT * FROM `my-topic`
PROPERTIES (
'post.process.action'=`MOVE`,
'post.process.action.bucket'=`archive-bucket`,
'post.process.action.prefix'=`archive/`
)INSERT INTO bucket
SELECT * FROM `*`
...INSERT INTO $bucket[:$prefix]
SELECT *
FROM $topic
...
PROPERTIES(
'padding.length.offset'=12,
'padding.length.partition'=12
)INSERT INTO $bucket[:$prefix]
SELECT *
FROM $topic
...
PROPERTIES (
'padding.type'=NoOp
)INSERT INTO $bucket[:$prefix]
SELECT *
FROM $topic
...
PROPERTIES (
'padding.length.offset'=12,
'padding.length.partition'=12
)class.name=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
...class.name=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnectorDeprecated
... class.name=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
...class.name=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnectorDeprecated
connect.s3.padding.strategy=NoOp
...io.lenses.streamreactor.connect.jms.source.JMSSourceConnectorname=jms-source
connector.class=io.lenses.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://activemq:61616
connect.jms.connection.factory=ConnectionFactoryINSERT INTO kafka_topic
SELECT *
FROM jms_destination
WITHTYPE [TOPIC|QUEUE]
[WITHCONVERTER=`myclass`]-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE
-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA
SELECT * FROM jms_topic
WITHTYPE TOPIC
WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.AvroConverter`