# GCP Storage

{% hint style="success" %}
You can specify multiple KCQL statements separated by **`;`** to have a connector sink multiple topics. The connector properties **topics** or **topics.regex** are required to be set to a value that matches the KCQL statements.
{% endhint %}

## Connector Class

```
io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](/latest/connectors/tutorials.md).
{% endhint %}

{% code fullWidth="true" %}

```properties
connector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=600, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
```

{% endcode %}

## KCQL Support <a href="#kcql-kafka-connect-query-language" id="kcql-kafka-connect-query-language"></a>

The connector uses KCQL to map topics to GCP Storage buckets and paths. The full KCQL syntax is:

```sql
INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]
```

Please note that you can employ escaping within KCQL for the **INSERT INTO, SELECT \* FROM**, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as Json can use fields contaiing `.`:

```json
{
  ...
  "a.b": "value",
  ...
}
```

In this case you can use the following KCQL statement:

```sql
INSERT INTO `container-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
```

### Target Bucket and Path <a href="#target-bucket-and-path" id="target-bucket-and-path"></a>

The target bucket and path are specified in the **INSERT INTO** clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

Here are a few examples:

```sql
INSERT INTO testcontainer:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testcontainer SELECT * FROM topicA;
INSERT INTO testcontainer:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
```

### SQL Projection <a href="#sql-projection" id="sql-projection"></a>

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT \* query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

## Source Topic <a href="#source-topic" id="source-topic"></a>

To avoid runtime errors, make sure the *topics* or *topics.regex* setting matches your KCQL statements. If the connector receives data for a topic without matching KCQL, it will throw an error. When using a regex to select topics, follow this KCQL pattern:

```
topics.regex = ^sensor_data_\d+$
connect.gcpstorage.kcql= INSERT INTO $target SELECT * FROM  `*` ....
```

In this case the topic name will be appended to the $target destination.

### KCQL Properties <a href="#properties" id="properties"></a>

The **PROPERTIES** clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

| Name                           | Description                                                                                                             | Type                    | Available Values        | Default Value                                                        |
| ------------------------------ | ----------------------------------------------------------------------------------------------------------------------- | ----------------------- | ----------------------- | -------------------------------------------------------------------- |
| padding.type                   | Specifies the type of padding to be applied.                                                                            | LeftPad, RightPad, NoOp | LeftPad, RightPad, NoOp | LeftPad                                                              |
| padding.char                   | Defines the character used for padding.                                                                                 | Char                    |                         | ‘0’                                                                  |
| padding.length.partition       | Sets the padding length for the partition.                                                                              | Int                     |                         | 0                                                                    |
| padding.length.offset          | Sets the padding length for the offset.                                                                                 | Int                     |                         | 12                                                                   |
| partition.include.keys         | Specifies whether partition keys are included.                                                                          | Boolean                 |                         | <p>false<br><strong>Default (Custom Partitioning):</strong> true</p> |
| store.envelope                 | Indicates whether to store the entire Kafka message                                                                     | Boolean                 |                         |                                                                      |
| store.envelope.fields.key      | Indicates whether to store the envelope’s key.                                                                          | Boolean                 |                         |                                                                      |
| store.envelope.fields.headers  | Indicates whether to store the envelope’s headers.                                                                      | Boolean                 |                         |                                                                      |
| store.envelope.fiels.value     | Indicates whether to store the envelope’s value.                                                                        | Boolean                 |                         |                                                                      |
| store.envelope.fields.metadata | Indicates whether to store the envelope’s metadata.                                                                     | Boolean                 |                         |                                                                      |
| flush.size                     | Specifies the size (in bytes) for the flush operation.                                                                  | Long                    |                         | 500000000 (500MB)                                                    |
| flush.count                    | Specifies the number of records for the flush operation.                                                                | Int                     |                         | 50000                                                                |
| flush.interval                 | Specifies the interval (in seconds) for the flush operation.                                                            | Long                    |                         | 3600 (1 hour)                                                        |
| key.suffix                     | When specified it appends the given value to the resulting object key before the "extension" (avro, json, etc) is added | String                  |                         | \<empty>                                                             |

The sink connector optimizes performance by padding the output object names, a practice that proves beneficial when using the GCP Storage Source connector to restore data. This object name padding ensures that objects are ordered lexicographically, allowing the GCP Storage Source connector to skip the need for reading, sorting, and processing all objects, thereby enhancing efficiency.

## Partitioning & Object Keys <a href="#object-key" id="object-key"></a>

The object key serves as the filename used to store data in GCP Storage. There are two options for configuring the object key:

* **Default**: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $container/\[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
* **Custom**: The object key is driven by the `PARTITIONBY` clause. The format is either `$container/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension` (GCP Athena naming style mimicking Hive-like data partitioning) or `$container/[$prefix]/customValue/topic(partition_offset).ext.` The extension is determined by the selected storage format.

{% hint style="warning" %}
The Connector automatically adds the topic name to the partition. There is no need to add it to the partition clause. If you want to explicitly add the topic or partition you can do so by using \_topic and \_partition.

The partition clause works on header, key and values fields of the Kafka message.
{% endhint %}

Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of GCP Storage key length restrictions.

To extract fields from the message values, simply use the field names in the **`PARTITIONBY`** clause. For example:

```sql
PARTITIONBY fieldA, fieldB
```

However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

You can also use the entire message key as long as it can be coerced into a primitive type:

```sql
PARTITIONBY _key
```

In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the GCP Storage object key name:

```sql
PARTITIONBY _key.fieldA, _key.fieldB
```

Kafka message headers can also be used in the GCP Storage object key definition, provided the header values are of primitive types easily convertible to strings:

```sql
PARTITIONBY _header.<header_key1>[, _header.<header_key2>]
```

Customizing the object key can leverage various components of the Kafka message. For example:

```sql
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
```

This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure GCP Storage object keys effectively.

To enable Athena-like partitioning, use the following syntax:

```sql
INSERT INTO $container[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
    'partition.include.keys'=true,
)
```

## Rolling Windows <a href="#rolling-window" id="rolling-window"></a>

Storing data in GCP Storage and partitioning it by time is a common practice in data management. For instance, you may want to organize your GCP Storage data in hourly intervals. This partitioning can be seamlessly achieved using the **`PARTITIONBY`** clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation [here](/latest/connectors/single-message-transforms.md).

Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called `timestamp`. Here’s the connector configuration to achieve this:

{% code fullWidth="true" %}

```properties
connector.class=io.lenses.streamreactor.connect.gcp.storage.sink.GCPStorageSinkConnector
connect.gcpstorage.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES('flush.interval'=30, 'flush.size'=1000000, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
```

{% endcode %}

In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

The **`PARTITIONBY`** clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

## Data Storage Format <a href="#data-storage-format" id="data-storage-format"></a>

While the **`STOREAS`** clause is optional, it plays a pivotal role in determining the storage format within GCP Storage. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the `key.converter` and `value.converter` settings to interpret the data.

Supported storage formats encompass:

* AVRO
* Parquet
* JSON
* CSV (including headers)
* Text
* BYTES

Opting for BYTES ensures that each record is stored in its own separate object. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in GCP Storage. For cases where you prefer to consolidate multiple records into a single binary object, AVRO or Parquet are the recommended choices.

By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the **`store.envelope`** property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

{% hint style="warning" %}
Not supported with a custom partition strategy.
{% endhint %}

```json
{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Key, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 0,
    "partition": 0,
    "timestamp": 0,
    "topic": "topic"
  }
}
```

Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in GCP Storage is desired.

### Examples <a href="#examples" id="examples"></a>

Storing the message Value Avro data as Parquet in GCP Storage:

{% code fullWidth="true" %}

```properties
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
```

{% endcode %}

The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in GCP Storage.

{% code fullWidth="true" %}

```properties
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
```

{% endcode %}

Enabling the full message stored as JSON in GCP Storage:

{% code fullWidth="true" %}

```properties
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
...
```

{% endcode %}

Enabling the full message stored as AVRO in GCP Storage:

{% code fullWidth="true" %}

```properties
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
...
```

{% endcode %}

If the restore (see the GCP Storage Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

{% code fullWidth="true" %}

```properties
...
connect.gcpstorage.kcql=INSERT INTO lensesiogcpstorage:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
...
```

{% endcode %}

## Flush Options <a href="#flush-options" id="flush-options"></a>

The connector offers three distinct flush options for data management:

* Flush by Count - triggers an object flush after a specified number of records have been written to it.
* Flush by Size - initiates an object flush once a predetermined size (in bytes) has been attained.
* Flush by Interval - enforces an object flush after a defined time interval (in seconds).

It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that objects are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the object, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the object is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

The flush options are configured using the **flush.count**, **flush.size**, and **flush.interval** KCQL Properties (see [#properties](#properties "mention")section). The settings are optional and if not specified the defaults are:

* flush.count = 50\_000
* flush.size = 500000000 (500MB)
* flush.interval = 3600 (1 hour)

{% hint style="success" %}
A connector instance can simultaneously operate on multiple topic partitions. When one partition triggers a flush, it will initiate a flush operation for all of them, even if the other partitions are not yet ready to flush.
{% endhint %}

When `connect.gcpstorage.latest.schema.optimization.enabled` is set to true, it reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but *backward-compatible* schemas are encountered. Consider the following sequence of messages and their associated schemas:

```
pgsqlCopyEditmessage1 -> schema1  
message2 -> schema1  
  (No flush needed – same schema)

message3 -> schema2  
  (Flush occurs – new schema introduced)

message4 -> schema2  
  (No flush needed – same schema)

message5 -> schema1  
  Without optimization: would trigger a flush  
  With optimization: no flush – schema1 is backward-compatible with schema2

message6 -> schema2  
message7 -> schema2  
  (No flush needed – same schema, it would happen based on the flush thresholds)
```

### Flushing By Interval

The next flush time is calculated based on the time the previous flush completed (the last modified time of the object written to GCP Storage). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won't be executed. This ensures a more consistent number of records per object.

![sink commit.png](/files/ivy3blnGxjimO3QrDgew)

## Compression <a href="#avro-and-parquet-compression" id="avro-and-parquet-compression"></a>

AVRO and Parquet offer the capability to compress files as they are written. The GCP Storage Sink connector provides advanced users with the flexibility to configure compression options.

Here are the available options for the `connect.gcpstorage.compression.codec`, along with indications of their support by Avro, Parquet and JSON writers:

| Compression  | Avro Support | Avro (requires Level) | Parquet Support | JSON |
| ------------ | ------------ | --------------------- | --------------- | ---- |
| UNCOMPRESSED | ✅            |                       | ✅               | ✅    |
| SNAPPY       | ✅            |                       | ✅               |      |
| GZIP         |              |                       | ✅               | ✅    |
| LZ0          |              |                       | ✅               |      |
| LZ4          |              |                       | ✅               |      |
| BROTLI       |              |                       | ✅               |      |
| BZIP2        | ✅            |                       |                 |      |
| ZSTD         | ✅            | ⚙️                    | ✅               |      |
| DEFLATE      | ✅            | ⚙️                    |                 |      |
| XZ           | ✅            | ⚙️                    |                 |      |

Please note that not all compression libraries are bundled with the GCP Storage connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

## Authentication <a href="#auth-mode" id="auth-mode"></a>

The connector offers two distinct authentication modes:

* Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
* File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
* Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

The simplest example to configure in the connector is the "Default" mode, as this requires no other configuration. This is configured, as its name suggests, by default.

When selecting the "Credentials" mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described [here](https://cloud.google.com/docs/authentication/application-default-credentials).

Here's an example configuration for the "Credentials" mode:

```properties
...
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
...
```

And here is an example configuration using the "File" mode:

```properties
...
connect.gcpstorage.gcp.auth.mode=File
connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-write-credential.txt
...
```

Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers [here](https://docs.lenses.io/5.4/connectors/connect-secrets/). This approach ensures robust security practices while handling access credentials.

## Error policies <a href="#error-polices" id="error-polices"></a>

The connector supports [Error policies](/latest/connectors/tutorials/using-error-policies.md).

## Retry behavior

The connector applies retries at **two independent layers**. They are complementary, not duplicates: each one targets a different category of failure, and both are active at the same time.

#### Layer 1 — HTTP / GCS SDK retries

Every individual call the connector makes to GCP Storage (object upload, copy, delete, list, get) is routed through the official Google Cloud Storage Java SDK, which transparently retries transient failures using exponential backoff. These retries are **invisible to Kafka Connect**: they happen entirely inside a single `put()` invocation, and the connector only sees the failure if every HTTP attempt has been exhausted.

Typical failures absorbed at this layer:

* TCP / TLS handshake errors, connection resets, DNS hiccups
* HTTP 5xx responses from GCS
* HTTP 429 throttling
* Short-lived endpoint blips

| Name                                               | Description                                                                                                            | Type   | Default |
| -------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------- | ------ | ------- |
| `connect.gcpstorage.http.max.retries`              | Maximum number of attempts per individual HTTP request.                                                                | int    | 36      |
| `connect.gcpstorage.http.retry.interval`           | Initial backoff delay (in milliseconds) before the first HTTP retry. Used as the base for the exponential backoff.     | long   | 500     |
| `connect.gcpstorage.http.retry.timeout.multiplier` | Multiplier applied between consecutive retries. The resulting delay is capped at 5x the initial `http.retry.interval`. | double | 3.0     |

{% hint style="info" %}
The defaults give roughly 36 attempts with delays ramping from 500 ms up to a 2.5 s cap, which is enough headroom to ride out the vast majority of transient GCS issues without any of them ever surfacing to Kafka Connect.
{% endhint %}

#### Layer 2 — Connector / Kafka Connect retries

When **all** HTTP retries above have been exhausted, or when an error happens **outside** an HTTP call (serialisation, schema, file-system, etc.), control returns to the connector's error policy. If the policy is `RETRY`, the connector throws a `RetriableException`, which causes Kafka Connect to **redeliver the same batch of records** to `put()` after a delay. This is repeated until the batch eventually succeeds or the configured retry budget is exhausted.

Properties:

| Name                                | Description                                                                                      | Type   | Default |
| ----------------------------------- | ------------------------------------------------------------------------------------------------ | ------ | ------- |
| `connect.gcpstorage.error.policy`   | `THROW` (fail immediately), `NOOP` (swallow and continue), or `RETRY` (re-deliver the batch).    | string | `THROW` |
| `connect.gcpstorage.max.retries`    | Maximum number of batch redeliveries before the task fails. Only used when `error.policy=RETRY`. | int    | 20      |
| `connect.gcpstorage.retry.interval` | Delay (in milliseconds) between batch redeliveries. Only used when `error.policy=RETRY`.         | int    | 60000   |

{% hint style="warning" %}
If `connect.gcpstorage.error.policy` is left at its default `THROW`, the `max.retries` and `retry.interval` settings are **not** used — any error escaping the HTTP layer will fail the task immediately.
{% endhint %}

#### Which layer handles what

| Failure category                                                                                                          | Handled by                             | Properties to tune                                                                                                                  |
| ------------------------------------------------------------------------------------------------------------------------- | -------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| Transient cloud noise: 5xx, 429, network resets, DNS / TLS blips, short-lived throttling                                  | GCS SDK retries (silent)               | `connect.gcpstorage.http.max.retries`, `connect.gcpstorage.http.retry.interval`, `connect.gcpstorage.http.retry.timeout.multiplier` |
| Sustained GCS unavailability, IAM / auth failures, schema or format errors, or anything that escapes the SDK retry budget | Connector-level retry policy (`RETRY`) | `connect.gcpstorage.error.policy=RETRY` + `connect.gcpstorage.max.retries` + `connect.gcpstorage.retry.interval`                    |

#### Choosing values

* **Tune the `http.*` settings to absorb cloud noise.** The defaults are sensible for most workloads. Increase `http.max.retries` or `http.retry.interval` if you operate over a noisy network or against a heavily-throttled bucket.
* **Use `error.policy=RETRY` as a backstop** for longer outages. The total ride-through window is approximately `max.retries x retry.interval`. With the defaults (20 x 60 s) the task survives roughly 20 minutes of continuous failure before giving up.
* **Combine with Kafka Connect's framework-level error handling** (`errors.tolerance=all`, `errors.deadletterqueue.topic.name`, etc.) for **per-record** poison pills (converter / SMT failures). The framework's `errors.tolerance` is **not** a substitute for `error.policy=RETRY`: it handles record-level errors, not batch-level GCS infrastructure failures. The two settings address different failure modes and are intended to be used together.

#### Example

A robust production configuration that combines both layers and adds Kafka Connect's poison-pill protection:

```properties
# Layer 1 - leave at defaults, or relax for noisy networks
# connect.gcpstorage.http.max.retries=36
# connect.gcpstorage.http.retry.interval=500
# connect.gcpstorage.http.retry.timeout.multiplier=3.0

# Layer 2 - ride through up to ~30 minutes of GCS unavailability
connect.gcpstorage.error.policy=RETRY
connect.gcpstorage.max.retries=30
connect.gcpstorage.retry.interval=60000

# Kafka Connect framework - per-record DLQ for converter / SMT errors
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=my-connector-dlq
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=3
```

## Offset commit semantics

A frequent question is **when** the GCP Storage sink connector advances Kafka consumer offsets, and what role the various "temporary" locations play in that process.

The short answer is:

{% hint style="success" %}
Offsets are **only** advanced after the data has been **durably written to its final object key in GCS** and the connector's index entry has been updated. Neither the local staging file nor the transient `.temp-upload/...` GCS object cause offsets to advance.
{% endhint %}

#### End-to-end flow for one batch

<figure><img src="/files/VweGOr0ez31KS0eDQCKI" alt=""><figcaption></figcaption></figure>

1. **Local staging.** Incoming records are serialised and appended to a file on the **Connect worker's local disk**, inside the directory pointed to by `connect.gcpstorage.local.tmp.directory` (or an OS temp directory if not set). Nothing is written to GCS and no Kafka offsets advance. If the task crashes here, the records are simply re-consumed from Kafka on restart.
2. **Flush.** When a flush threshold is reached (`flush.count`, `flush.size`, `flush.interval`, schema change, etc.), the connector uploads the local staging file to GCS. The pipeline depends on `connect.gcpstorage.exactly.once.enable`:
   * **Exactly-once (default, `exactly.once.enable=true`).** A 3-step pipeline that goes via a transient object, fenced by GCS object generations:
   1. **Upload** the local staging file to a transient object at `gs://<bucket>/.temp-upload/<topic>/<partition>/<uuid>/<finalKey>`.
   2. **Copy** that transient object to the final destination key, using a generation / eTag precondition so a concurrent writer (e.g. during a rebalance) cannot overwrite or duplicate the final object.
   3. **Delete** the transient object under `.temp-upload/`.

      e `.indexes/` entry is updated after each step so that a restarted task can pick up the pipeline mid-flight.

      At-least-once (`exactly.once.enable=false`).\*\* The local staging file is uploaded **directly to the final destination key**. There is no `.temp-upload/` indirection, no eTag / generation fencing, and no `.indexes/` checkpoint. The connector falls back to Kafka Connect's native at-least-once offset management.
3. **Committed offset advances.** Only once the final object is in place at its proper destination key — and, with exactly-once enabled, only once the corresponding `.indexes/` entry has been updated — does the writer's committed offset advance.
4. **`preCommit`.** The next time Kafka Connect calls `preCommit`, the connector returns the latest safely committed offset for each partition. If any records are still buffered locally and not yet flushed, the connector returns the offset of the **first still-buffered record** — Kafka Connect will not advance past anything that is not durably in GCS.

#### The two "temporary" locations

| Location                                                 | Where it physically lives                                                       | Purpose                                                                                                                                                                                                | Affects committed offset?    |
| -------------------------------------------------------- | ------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------- |
| Local staging file                                       | Disk on the Connect worker (`connect.gcpstorage.local.tmp.directory` or OS tmp) | Buffers Kafka records into the chosen file format (AVRO / Parquet / JSON / CSV / Text / BYTES) before flush. Bounded by `flush.size` / `flush.count` / `flush.interval`.                               | No                           |
| `.temp-upload/<topic>/<partition>/<uuid>/...` GCS object | The **same GCS bucket** as the destination, under the `.temp-upload/` prefix    | Atomic, fenced staging step used only when exactly-once is enabled. The local file is uploaded here, then copied to the final key with an eTag precondition, then deleted. Exists only for one commit. | No                           |
| Final destination key                                    | The configured GCS bucket / path                                                | The actual data that downstream consumers read.                                                                                                                                                        | **Yes** (after index update) |

{% hint style="info" %}
Nothing is held in memory only. The local staging file is real on-disk storage on the worker, and .temp-upload/... is a real GCS object. This is what allows the connector to recover cleanly from crashes, rebalances and worker restarts.
{% endhint %}

#### Restart behaviour

The connector is designed so that no Kafka offset ever moves ahead of data that has been durably written to its final GCS key. The exact failure modes vary by mode:

**Exactly-once (default)**

* **Crash during local staging** — nothing in GCS, offsets unchanged. Records are re-consumed from Kafka. No duplicates, no data loss.
* **Crash mid-pipeline** (between upload, copy and delete in `.temp-upload/`) — the `.indexes/` entry records exactly which step was reached. On restart the connector resumes the pipeline from that point. The eTag / generation precondition on the copy step prevents two writers from racing the final key during a rebalance.
* **Crash after the final write but before the index entry advances** — the final object is in GCS but the offset has not advanced. On restart the records are re-uploaded; the eTag fence keeps the existing final object unchanged.

**At-least-once (`exactly.once.enable=false`)**

* **Crash during local staging** behaves identically to the exactly-once case (records re-consumed, no data loss).
* **Crash during or after the upload to the final key** may produce duplicate or partially-overwritten objects on restart, because there is no fencing and no `.indexes/` checkpoint. Use this mode only when downstream consumers can tolerate duplicates.

#### Operational notes

* Tuning the flush thresholds (`flush.count` / `flush.size` / `flush.interval`) controls how often offsets advance, and therefore how much data is replayed after a worker crash. Smaller flush windows = smaller replays on restart, at the cost of more, smaller objects in GCS.
* The `.temp-upload/` prefix is internal connector machinery. It is safe to ignore in lifecycle policies, but **do not exclude it from the connector's IAM permissions** — the connector needs to create, copy and delete objects under that prefix.

## Indexes Prefix

The connector uses the concept of index objects that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index objects.

By default, the prefix for these index objects is named .indexes for all connectors. However, each connector will create and store its index objects within its own nested prefix inside this `.indexes` prefix.

You can configure the root prefix for these index objects using the property `connect.gcpstorage.indexes.name`. This property specifies the path from the root of the GCS bucket. Note that even if you configure this property, the connector will still create a nested prefix within the specified prefix.

### Examples

|                                                |                                                |                                                                                                             |
| ---------------------------------------------- | ---------------------------------------------- | ----------------------------------------------------------------------------------------------------------- |
| Index Name (`connect.gcpstorage.indexes.name`) | Resulting Indexes Prefix Structure             | Description                                                                                                 |
| `.indexes` (default)                           | `.indexes/<connector_name>/`                   | The default setup, where each connector uses its own nested prefix within `.indexes`.                       |
| `custom-indexes`                               | `custom-indexes/<connector_name>/`             | Custom prefix `custom-indexes`, with a nested prefix for each connector.                                    |
| `indexes/gcs-connector-logs`                   | `indexes/gcs-connector-logs/<connector_name>/` | Uses a custom nested prefix `gcs-connector-logs` within `indexes`, with a nested prefix for each connector. |
| `logs/indexes`                                 | `logs/indexes/<connector_name>/`               | Indexes are stored under `logs/indexes`, with a nested prefix for each connector.                           |

## Option Reference <a href="#connector-properties" id="connector-properties"></a>

| Name                                                  | Description                                                                                                                                                                                                                                                                                                                                                                                                                   | Type    | Available Values                                                                           | Default Value  |
| ----------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | ------------------------------------------------------------------------------------------ | -------------- |
| connect.gcpstorage.gcp.auth.mode                      | Specifies the authentication mode for connecting to GCP.                                                                                                                                                                                                                                                                                                                                                                      | string  | "Credentials", "File" or "Default"                                                         | "Default"      |
| connect.gcpstorage.gcp.credentials                    | For "auth.mode" credentials: GCP Authentication credentials string.                                                                                                                                                                                                                                                                                                                                                           | string  |                                                                                            | (Empty)        |
| connect.gcpstorage.gcp.file                           | For "auth.mode" file: Local file path for file containing GCP authentication credentials.                                                                                                                                                                                                                                                                                                                                     | string  |                                                                                            | (Empty)        |
| connect.gcpstorage.gcp.project.id                     | GCP Project ID.                                                                                                                                                                                                                                                                                                                                                                                                               | string  |                                                                                            | (Empty)        |
| connect.gcpstorage.gcp.quota.project.id               | GCP Quota Project ID.                                                                                                                                                                                                                                                                                                                                                                                                         | string  |                                                                                            | (Empty)        |
| connect.gcpstorage.endpoint                           | Endpoint for GCP Storage. Leave empty to use the default Google endpoint; set this only when targeting an emulator or a private endpoint.                                                                                                                                                                                                                                                                                     | string  |                                                                                            | (Empty)        |
| connect.gcpstorage.error.policy                       | Defines the error handling policy when errors occur during data transfer to or from GCP Storage. `THROW` propagates the error and fails the task immediately. `NOOP` swallows the error and continues. `RETRY` causes Kafka Connect to redeliver the same batch of records to the connector; the number of redeliveries is bounded by `connect.gcpstorage.max.retries` and spaced by `connect.gcpstorage.retry.interval`.     | string  | "NOOP", "THROW", "RETRY"                                                                   | "THROW"        |
| connect.gcpstorage.max.retries                        | Maximum number of times Kafka Connect will redeliver a failed batch to the connector before the task fails. **Only takes effect when `connect.gcpstorage.error.policy=RETRY`.**                                                                                                                                                                                                                                               | int     |                                                                                            | 20             |
| connect.gcpstorage.retry.interval                     | Delay (in milliseconds) between batch redeliveries. **Only takes effect when `connect.gcpstorage.error.policy=RETRY`.**                                                                                                                                                                                                                                                                                                       | int     |                                                                                            | 60000          |
| connect.gcpstorage.http.max.retries                   | Maximum number of times the GCS SDK will internally retry a single HTTP request that fails with a transient/retriable error (e.g. 5xx, throttling, connection reset). These retries are transparent to the connector and to Kafka Connect.                                                                                                                                                                                    | long    |                                                                                            | 36             |
| connect.gcpstorage.http.retry.interval                | Initial delay (in milliseconds) used by the GCS SDK before retrying a failed HTTP request. Subsequent retries grow exponentially using `connect.gcpstorage.http.retry.timeout.multiplier`.                                                                                                                                                                                                                                    | long    |                                                                                            | 500            |
| connect.gcpstorage.http.retry.timeout.multiplier      | Exponential backoff multiplier applied between consecutive HTTP retries by the GCS SDK. The delay between attempts grows by this factor each time, capped at 5x the initial `connect.gcpstorage.http.retry.interval`.                                                                                                                                                                                                         | double  |                                                                                            | 3.0            |
| connect.gcpstorage.http.socket.timeout                | HTTP socket (read) timeout in milliseconds for GCS service calls. When unset, the GCS SDK default applies.                                                                                                                                                                                                                                                                                                                    | int     |                                                                                            | (Empty)        |
| connect.gcpstorage.http.connection.timeout            | HTTP connection-establishment timeout in milliseconds for GCS service calls. When unset, the GCS SDK default applies.                                                                                                                                                                                                                                                                                                         | int     |                                                                                            | (Empty)        |
| connect.gcpstorage.local.tmp.directory                | Local directory on the Connect worker used as a staging area where Kafka records are buffered into the chosen file format before being uploaded to GCS. When left empty (the default), the connector creates a temporary directory under the OS-default location (e.g. `/tmp/<task-id>-<uuid>`). The directory must be on storage that has enough free space to hold one in-flight object per partition assigned to the task. | string  |                                                                                            | (Empty)        |
| connect.gcpstorage.kcql                               | A SQL-like configuration that defines the behavior of the connector.                                                                                                                                                                                                                                                                                                                                                          | string  |                                                                                            | (Empty)        |
| connect.gcpstorage.compression.codec                  | Compression codec applied when writing data to GCP Storage. Applies to Avro, Parquet and JSON writers; not all codecs are supported by every format (see the Compression section above for the support matrix).                                                                                                                                                                                                               | string  | "UNCOMPRESSED", "SNAPPY", "GZIP", "LZ0", "LZ4", "BROTLI", "BZIP2", "ZSTD", "DEFLATE", "XZ" | "UNCOMPRESSED" |
| connect.gcpstorage.compression.level                  | Compression level applied when the codec supports one. Only used by Avro `ZSTD`, `DEFLATE` and `XZ` codecs (marked with the gear icon in the Compression support matrix above); silently ignored for other codec / format combinations.                                                                                                                                                                                       | int     | 1-9                                                                                        | (Empty)        |
| connect.gcpstorage.seek.max.files                     | Maximum number of objects the connector inspects when seeking the resume position on task start (per topic-partition). Affects start-up latency on buckets containing very large numbers of historical objects; does not affect ongoing write behaviour.                                                                                                                                                                      | int     |                                                                                            | 5              |
| connect.gcpstorage.indexes.name                       | Root prefix (relative to the bucket root) under which the connector writes its index objects. The connector always creates a per-connector nested prefix beneath this root, named after the connector's `name` setting (see the Indexes Prefix section above for examples).                                                                                                                                                   | string  |                                                                                            | ".indexes"     |
| connect.gcpstorage.exactly.once.enable                | When `true` (default), uses the connector's exactly-once write pipeline (`.temp-upload/` staging + atomic copy with generation fencing). When `false`, the connector falls back to direct uploads with at-least-once semantics, relying on Kafka Connect's native offset management.                                                                                                                                          | boolean | true, false                                                                                | true           |
| connect.gcpstorage.schema.change.detector             | Configure how the file will roll over upon receiving a record with a schema different from the accumulated ones. This property configures schema change detection with `default` (object equality), `version` (version field comparison), or `compatibility` (Avro compatibility checking).                                                                                                                                   | string  | `default`, `version`, `compatibility`                                                      | `default`      |
| connect.gcpstorage.skip.null.values                   | Skip records with null values (a.k.a. tombstone records).                                                                                                                                                                                                                                                                                                                                                                     | boolean | true, false                                                                                | false          |
| connect.gcpstorage.latest.schema.optimization.enabled | When set to true, reduces unnecessary data flushes when writing to Avro or Parquet formats. Specifically, it leverages schema compatibility to avoid flushing data when messages with older but backward-compatible schemas are encountered.                                                                                                                                                                                  | boolean | true, false                                                                                | false          |
| connect.gcpstorage.avoid.resumable.upload             | When `true`, the connector uses single-shot multipart uploads instead of GCS resumable uploads. Useful for very small objects or when a customer's environment has issues with resumable upload sessions. The default `false` retains resumable uploads, which are recommended for large objects.                                                                                                                             | boolean | true, false                                                                                | false          |
| connect.gcpstorage.log.metrics                        | When `true`, the connector emits per-task throughput and latency metrics to the connector log. Useful for debugging and performance tuning; off by default to keep logs concise.                                                                                                                                                                                                                                              | boolean | true, false                                                                                | false          |
| connect.gcpstorage.ordering.type                      | Ordering scheme used when listing objects to discover the resume offset on task startup. `AlphaNumeric` orders objects by lexicographic name (the order produced by the connector's zero-padded offset naming).                                                                                                                                                                                                               | string  | "AlphaNumeric"                                                                             | "AlphaNumeric" |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/gcp-storage.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
