View the latest documentation 5.5
This connector configuration is designed for ingesting data from , into Apache Kafka.
name=aws-s3SourceConnectorParquet # this can be anything connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector tasks.max=1 connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet` value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8089 connect.s3.aws.region=eu-west-2 connect.s3.aws.secret.key=SECRET_KEY connect.s3.aws.access.key=ACCESS_KEY connect.s3.aws.auth.mode=Credentials
insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
$BUCKET_NAME
$PREFIX_NAME
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector tasks.max=4 connect.s3.error.policy=THROW connect.s3.partition.search.recurse.levels=0 connect.partition.search.continuous=true connect.s3.source.partition.extractor.type=hierarchical connect.s3.kcql=insert into food-restored select * from YOUR_BUCKET:YOUR_PREFIX BATCH=2000 STOREAS `JSON` LIMIT 10000 PROPERTIES('store.envelope'=true) name=aws-s3SourceEnvelope value.converter=org.apache.kafka.connect.storage.StringConverter errors.log.enable=true key.converter=org.apache.kafka.connect.storage.StringConverter connect.s3.aws.region=eu-west-2 connect.s3.aws.secret.key=SECRET_KEY connect.s3.aws.access.key=ACCESS_KEY connect.s3.aws.auth.mode=Credentials
This configuration example is particularly useful when you need to restore data from a AWS S3, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
BATCH=2000
LIMIT 10000
connect.s3.error.policy=THROW
connect.s3.source.partition.extractor.type=hierarchical
connect.partition.search.continuous=true
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
name=aws-s3AvroSourceEnvelope connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector tasks.max=1 connect.s3.error.policy=THROW connect.s3.partition.search.recurse.levels=0 connect.partition.search.continuous=true connect.s3.kcql=insert into car_speed_events_replicated select * from YOUR_BUCKET:YOUR_PREFIX STOREAS `AVRO` PROPERTIES('store.envelope' = true) errors.log.enable=true value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 key.converter=org.apache.kafka.connect.storage.StringConverter connect.s3.aws.region=eu-west-2 connect.s3.aws.secret.key=SECRET_KEY connect.s3.aws.access.key=ACCESS_KEY connect.s3.aws.auth.mode=Credentials
Similar to the above, this is another configuration for envelope format
tasks.max=4
tasks.max=1
STOREAS 'AVRO'
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
On this page