Examples for GCP Source Kafka Connector.
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: gcp-storageSourceConnectorParquet (This can be customized as needed)
Connector Class: io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME
: Name of the GCP Storage storage bucket.
$PREFIX_NAME
: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
This configuration example is particularly useful when you need to restore data from a GCP Storage, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
Data Restoration with Envelope Structure: If you’re restoring data from GCP Storage into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000
) and a limit on the number of records (LIMIT 10000
). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
Error Handling: Error handling is configured to throw exceptions (connect.gcpstorage.error.policy=THROW
) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.gcpstorage.source.partition.extractor.type=hierarchical
), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true
), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
Similar to the above, this is another configuration for envelope format
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4
), this variant is configured for single-task processing (tasks.max=1
). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'
), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter
). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081
), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.gcpstorage.source.partition.extractor.type=hierarchical
), enabling extraction of partitioning structures from the source data location.
Examples for AWS S3 Sink Kafka Connector time based partitioning.
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. This provides a default partitioning mechanism for Kafka topics.
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
Examples for AWS S3 Source Kafka Connector.
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: aws-s3SourceConnectorParquet (This can be customized as needed)
Connector Class: io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME
: Name of the AWS S3 storage bucket.
$PREFIX_NAME
: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
This configuration example is particularly useful when you need to restore data from a AWS S3, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
Data Restoration with Envelope Structure: If you’re restoring data from AWS S3 into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000
) and a limit on the number of records (LIMIT 10000
). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
Error Handling: Error handling is configured to throw exceptions (connect.s3.error.policy=THROW
) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.s3.source.partition.extractor.type=hierarchical
), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true
), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
Similar to the above, this is another configuration for envelope format
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4
), this variant is configured for single-task processing (tasks.max=1
). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'
), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter
). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081
), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.s3.source.partition.extractor.type=hierarchical
), enabling extraction of partitioning structures from the source data location.
Examples for GCP Sink Kafka Connector time based partitioning.
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. This provides a default partitioning mechanism for Kafka topics.
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.