View the latest documentation 5.5
This connector configuration is designed for ingesting data from , into Apache Kafka.
name=gcp-storageSourceConnectorParquet # this can be anything connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector tasks.max=1 connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet` value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8089 connect.gcpstorage.gcp.auth.mode=Credentials connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
$BUCKET_NAME
$PREFIX_NAME
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector tasks.max=4 connect.gcpstorage.error.policy=THROW connect.gcpstorage.partition.search.recurse.levels=0 connect.partition.search.continuous=true connect.gcpstorage.source.partition.extractor.type=hierarchical connect.gcpstorage.kcql=insert into food-restored select * from YOUR_BUCKET:YOUR_PREFIX BATCH=2000 STOREAS `JSON` LIMIT 10000 PROPERTIES('store.envelope'=true) name=gcp-storageSourceEnvelope value.converter=org.apache.kafka.connect.storage.StringConverter errors.log.enable=true key.converter=org.apache.kafka.connect.storage.StringConverter connect.gcpstorage.gcp.auth.mode=Credentials connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
This configuration example is particularly useful when you need to restore data from a GCP Storage, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
BATCH=2000
LIMIT 10000
connect.gcpstorage.error.policy=THROW
connect.gcpstorage.source.partition.extractor.type=hierarchical
connect.partition.search.continuous=true
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
name=gcp-storageAvroSourceEnvelope connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector tasks.max=1 connect.gcpstorage.error.policy=THROW connect.gcpstorage.partition.search.recurse.levels=0 connect.partition.search.continuous=true connect.gcpstorage.kcql=insert into car_speed_events_replicated select * from YOUR_BUCKET:YOUR_PREFIX STOREAS `AVRO` PROPERTIES('store.envelope' = true) errors.log.enable=true value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 key.converter=org.apache.kafka.connect.storage.StringConverter connect.gcpstorage.gcp.auth.mode=Credentials connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
Similar to the above, this is another configuration for envelope format
tasks.max=4
tasks.max=1
STOREAS 'AVRO'
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
On this page