GCP Storage
This page describes the usage of the Stream Reactor GCP Storage Sink Connector.
Last updated
This page describes the usage of the Stream Reactor GCP Storage Sink Connector.
Last updated
2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
For more examples see the .
The connector uses KCQL to map topics to GCP Storage buckets and paths. The full KCQL syntax is:
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as Json can use fields contaiing .
:
In this case you can use the following KCQL statement:
The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.
Here are a few examples:
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.
The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics
or topics.regex
property in the connector and ensure proper mapping to the KCQL statements.
Set the FROM clause to *. This will auto map the topic as a partition.
The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
The sink connector optimizes performance by padding the output files, a practice that proves beneficial when using the GCP Storage Source connector to restore data. This file padding ensures that files are ordered lexicographically, allowing the GCP Storage Source connector to skip the need for reading, sorting, and processing all files, thereby enhancing efficiency.
The object key serves as the filename used to store data in GCP Storage. There are two options for configuring the object key:
Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
Custom: The object key is driven by the PARTITIONBY
clause. The format is either $container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension
(GCP Athena naming style mimicking Hive-like data partitioning) or $container/[$prefix]/customValue/topic(partition_offset).ext.
The extension is determined by the selected storage format.
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using __topic and __partition.
The partition clause works on header, key and values fields of the Kafka message.
Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of GCP Storage key length restrictions.
To extract fields from the message values, simply use the field names in the PARTITIONBY
clause. For example:
However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.
You can also use the entire message key as long as it can be coerced into a primitive type:
In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the GCP Storage object key name:
Kafka message headers can also be used in the GCP Storage object key definition, provided the header values are of primitive types easily convertible to strings:
Customizing the object key can leverage various components of the Kafka message. For example:
This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure GCP Storage object keys effectively.
To enable Athena-like partitioning, use the following syntax:
Storing data in GCP Storage and partitioning it by time is a common practice in data management. For instance, you may want to organize your GCP Storage data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY
clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.
Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp
. Here’s the connector configuration to achieve this:
In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.
The PARTITIONBY
clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.
While the STOREAS
clause is optional, it plays a pivotal role in determining the storage format within GCP Storage. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter
and value.converter
settings to interpret the data.
Supported storage formats encompass:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in GCP Storage. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.
By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope
property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:
Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in GCP Storage is desired.
Storing the message Value Avro data as Parquet in GCP Storage:
The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in GCP Storage.
Enabling the full message stored as JSON in GCP Storage:
Enabling the full message stored as AVRO in GCP Storage:
If the restore (see the GCP Storage Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:
The connector offers three distinct flush options for data management:
Flush by Count - triggers a file flush after a specified number of records have been written to it.
Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
Flush by Interval - enforces a file flush after a defined time interval (in seconds).
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
flush.count = 50_000
flush.size = 500000000 (500MB)
flush.interval = 3600 (1 hour)
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to GCP Storage). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per file.
AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options. Here are the available options for the connect.gcpstorage.compression.codec
, along with indications of their support by Avro and Parquet writers:
Please note that not all compression libraries are bundled with the GCP Storage connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.
The connector offers two distinct authentication modes:
Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
Connection String: This mode enables simpler configuration by relying on the connection string to authenticate with GCP.
Credentials: In this mode, explicit configuration of GCP Access Key and Secret Key is required for authentication.
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
And here is an example configuration using the “Connection String” mode:
The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.
By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes
directory.
You can configure the root directory for these index files using the property connect.gcpstorage.indexes.name
. This property specifies the path from the root of the GCS bucket. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.
Name | Description | Type | Available Values | Default Value |
---|
To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation .
The flush options are configured using the flush.count, flush.size, and flush.interval KCQL Properties (see section). The settings are optional and if not specified the defaults are:
Compression | Avro Support | Avro (requires Level) | Parquet Support |
---|
For enhanced security and flexibility when using either the “Credentials” or “Connection String” modes, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers . This approach ensures robust security practices while handling access credentials.
The connector supports .
Name | Description | Type | Available Values | Default Value |
---|
padding.type | Specifies the type of padding to be applied. | LeftPad, RightPad, NoOp | LeftPad, RightPad, NoOp | LeftPad |
padding.char | Defines the character used for padding. | Char | ‘0’ |
padding.length.partition | Sets the padding length for the partition. | Int | 0 |
padding.length.offset | Sets the padding length for the offset. | Int | 12 |
partition.include.keys | Specifies whether partition keys are included. | Boolean | false Default (Custom Partitioning): true |
store.envelope | Indicates whether to store the entire Kafka message | Boolean |
store.envelope.fields.key | Indicates whether to store the envelope’s key. | Boolean |
store.envelope.fields.headers | Indicates whether to store the envelope’s headers. | Boolean |
store.envelope.fiels.value | Indicates whether to store the envelope’s value. | Boolean |
store.envelope.fields.metadata | Indicates whether to store the envelope’s metadata. | Boolean |
flush.size | Specifies the size (in bytes) for the flush operation. | Long | 500000000 (500MB) |
flush.count | Specifies the number of records for the flush operation. | Int | 50000 |
flush.interval | Specifies the interval (in seconds) for the flush operation. | Long | 3600 (1 hour) |
UNCOMPRESSED | ✅ | ✅ |
SNAPPY | ✅ | ✅ |
GZIP | ✅ |
LZ0 | ✅ |
LZ4 | ✅ |
BROTLI | ✅ |
BZIP2 | ✅ |
ZSTD | ✅ | ⚙️ | ✅ |
DEFLATE | ✅ | ⚙️ |
XZ | ✅ | ⚙️ |
Index Name ( | Resulting Indexes Directory Structure | Description |
|
| The default setup, where each connector uses its own subdirectory within |
|
| Custom root directory |
|
| Uses a custom subdirectory |
|
| Indexes are stored under |
connect.gcpstorage.gcp.auth.mode | Specifies the authentication mode for connecting to GCP. | string | "Credentials", "File" or "Default" | "Default" |
connect.gcpstorage.gcp.credentials | For "auth.mode" credentials: GCP Authentication credentials string. | string | (Empty) |
connect.gcpstorage.gcp.file | For "auth.mode" file: Local file path for file containing GCP authentication credentials. | string | (Empty) |
connect.gcpstorage.gcp.project.id | GCP Project ID. | string | (Empty) |
connect.gcpstorage.gcp.quota.project.id | GCP Quota Project ID. | string | (Empty) |
connect.gcpstorage.endpoint | Endpoint for GCP Storage. | string | (Empty) |
connect.gcpstorage.error.policy | Defines the error handling policy when errors occur during data transfer to or from GCP Storage. | string | "NOOP", "THROW", "RETRY" | "THROW" |
connect.gcpstorage.max.retries | Sets the maximum number of retries the connector will attempt before reporting an error. | int | 20 |
connect.gcpstorage.retry.interval | Specifies the interval (in milliseconds) between retry attempts by the connector. | int | 60000 |
connect.gcpstorage.http.max.retries | Sets the maximum number of retries for the underlying HTTP client when interacting with GCP. | long | 5 |
connect.gcpstorage.http.retry.interval | Specifies the retry interval (in milliseconds) for the underlying HTTP client. | long | 50 |
connect.gcpstorage.http.retry.timeout.multiplier | Specifies the change in delay before the next retry or poll | double | 3.0 |
connect.gcpstorage.local.tmp.directory | Enables the use of a local folder as a staging area for data transfer operations. | string | (Empty) |
connect.gcpstorage.kcql | A SQL-like configuration that defines the behavior of the connector. | string | (Empty) |
connect.gcpstorage.compression.codec | Sets the Parquet compression codec to be used when writing data to GCP Storage. | string | "UNCOMPRESSED", "SNAPPY", "GZIP", "LZ0", "LZ4", "BROTLI", "BZIP2", "ZSTD", "DEFLATE", "XZ" | "UNCOMPRESSED" |
connect.gcpstorage.compression.level | Sets the compression level when compression is enabled for data transfer to GCP Storage. | int | 1-9 | (Empty) |
connect.gcpstorage.seek.max.files | Specifies the maximum threshold for the number of files the connector uses. | int | 5 |
connect.gcpstorage.indexes.name | Configure the indexes root directory for this connector. | string | ".indexes" |
connect.gcpstorage.exactly.once.enable | By setting to 'false', disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset management | boolean | true, false | true |
connect.gcpstorage.schema.change.rollover | When set to | boolean | true,false | true |