# GCP Storage

## Connector Class

```
io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
```

## Example

{% hint style="success" %}
For more examples see the [tutorials](https://docs.lenses.io/latest/connectors/tutorials).
{% endhint %}

{% code fullWidth="true" %}

```properties
name=gcp-storageSourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasks.max=1
connect.gcpstorage.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
```

{% endcode %}

## KCQL Support

{% hint style="success" %}
You can specify multiple KCQL statements separated by `;` to have the connector sink into multiple topics.

However, you can not route the same source to different topics, for this use a separate connector instance.
{% endhint %}

The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

```sql
INSERT INTO $kafka-topic
SELECT *
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]
```

Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT \* FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

```sql
INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix
```

## Source Bucket & Path

The GCP Storage source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.

The FROM clause format is:

```sql
FROM [bucketname]:pathprefix
//my-bucket-called-pears:my-folder-called-apples 
```

{% hint style="info" %}
If your data in GCS was not written by the Lenses GCS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If `LastModified` sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.

**`connect.gcpstorage.source.partition.extractor.regex=none`**

**`connect.gcpstorage.source.ordering.type=LastModified`**

To load in alpha numeric order set the ordering type to `AlphaNumeric`.
{% endhint %}

## Target Bucket & Path

The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:

{% code fullWidth="false" %}

```sql
INSERT INTO my-apples-topic SELECT * FROM  my-bucket-called-pears:my-folder-called-apples 
```

{% endcode %}

## GCP Storage object formats

The connector supports a range of storage formats, each with its own distinct functionality:

* **JSON**: The connector will read objects containing JSON content, each line representing a distinct record.
* **Avro**: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
* **Parquet**: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
* **Text**: The connector will read objects containing lines of text, each line representing a distinct record.
* **CSV**: The connector will read objects containing lines of text, each line representing a distinct record.
* **CSV\_WithHeaders**: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.
* **Bytes**: The connector will read objects containing bytes, each object is translated to a Kafka message.

Use the `STOREAS` clause to configure the storage format. The following options are available:

```properties
STOREAS `JSON`
STOREAS `Avro`
STOREAS `Parquet`
STOREAS `Text`
STOREAS `CSV`
STOREAS `CSV_WithHeaders`
STOREAS `Bytes`
```

### Text Processing <a href="#text-extended" id="text-extended"></a>

When using Text storage, the connector provides additional configuration options to finely control how text content is processed.

#### **Regex**

In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:

{% code fullWidth="true" %}

```properties
connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')
```

{% endcode %}

#### **Start-End line**

In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:

{% code fullWidth="true" %}

```properties
connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')
```

{% endcode %}

To trim the start and end lines, set the read.text.trim property to true:

{% code fullWidth="true" %}

```properties
connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')
```

{% endcode %}

#### **Start-End tag**

In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:

{% code fullWidth="true" %}

```properties
 connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')
```

{% endcode %}

### Storage output matrix <a href="#storage-to-output-matrix" id="storage-to-output-matrix"></a>

Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

| S3 Storage Format  | Kafka Output Format | Restore or replicate cluster | Analytics | Sink Converter                       | Source Converter                                  |
| ------------------ | ------------------- | ---------------------------- | --------- | ------------------------------------ | ------------------------------------------------- |
| JSON               | STRING              | Same,Other                   | Yes, No   | StringConverter                      | StringConverter                                   |
| AVRO,Parquet       | STRING              | Same,Other                   | Yes       | StringConverter                      | StringConverter                                   |
| AVRO,Parquet       | STRING              | Same,Other                   | No        | ByteArrayConverter                   | ByteArrayConverter                                |
| JSON               | JSON                | Same,Other                   | Yes       | JsonConverter                        | StringConverter                                   |
| JSON               | JSON                | Same,Other                   | No        | StringConverter                      | StringConverter                                   |
| AVRO,Parquet       | JSON                | Same,Other                   | Yes,No    | JsonConverter                        | JsonConverter or Avro Converter( Glue, Confluent) |
| AVRO,Parquet, JSON | BYTES               | Same,Other                   | Yes,No    | ByteArrayConverter                   | ByteArrayConverter                                |
| AVRO,Parquet       | AVRO                | Same                         | Yes       | Avro Converter( Glue, Confluent)     | Avro Converter( Glue, Confluent)                  |
| AVRO,Parquet       | AVRO                | Same                         | No        | ByteArrayConverter                   | ByteArrayConverter                                |
| AVRO,Parquet       | AVRO                | Other                        | Yes,No    | Avro Converter( Glue, Confluent)     | Avro Converter( Glue, Confluent)                  |
| AVRO,Parquet       | Protobuf            | Same                         | Yes       | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent)              |
| AVRO,Parquet       | Protobuf            | Same                         | No        | ByteArrayConverter                   | ByteArrayConverter                                |
| AVRO,Parquet       | Protobuf            | Other                        | Yes,No    | Protobuf Converter( Glue, Confluent) | Protobuf Converter( Glue, Confluent)              |
| AVRO,Parquet, JSON | Other               | Same, Other                  | Yes,No    | ByteArrayConverter                   | ByteArrayConverter                                |

## Projections

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT \* query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.

## Ordering

[The sink employs zero-padding in object name](#user-content-fn-3)\[^3]s to ensure precise ordering, leveraging optimizations offered by the GCS API, guaranteeing the accurate sequence of objects.

When using the GCS source alongside the GCS sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where GCS data is generated by applications that do not maintain lexical object name order.

In such cases, to process objects in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the `connect.gcpstorage.source.ordering.type` to `LastModified`. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.

If using `LastModified` sorting, ensure objects do not arrive late, or use a post-processing step to handle them.

## Throttling

To limit the number of object names the source reads from GCS in a single poll. The default value, if not specified, is 1000:

```sql
BATCH = 100
```

To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.

```sql
LIMIT 10000
```

## Object Extension Filtering

The GCP Storage Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: `connect.gcpstorage.source.extension.excludes` and `connect.gcpstorage.source.extension.includes`.

### Excluding Object Extensions

The `connect.gcpstorage.source.extension.excludes` property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude `.txt` and `.csv` objects, you would set this property as follows:

```properties
connect.gcpstorage.source.extension.excludes=txt,csv
```

### Including Object Extensions

The `connect.gcpstorage.source.extension.includes` property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only `.json` and `.xml` objects, you would set this property as follows:

```properties
connect.gcpstorage.source.extension.includes=json,xml
```

Note: If both `connect.gcpstorage.source.extension.excludes` and `connect.gcpstorage.source.extension.includes` are set, the connector first applies the exclusion filter and then the inclusion filter.

### Post-Processing Options

Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving objects to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.

#### Use Cases for Post-Processing Options

1. **Deleting objects After Processing**

   For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.

   Example:

   ```sql
   INSERT INTO `my-topic`
   SELECT * FROM `my-gcp-storage-bucket:my-prefix`
   PROPERTIES (
       'post.process.action'=`DELETE`
   )
   ```

   Result: objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.
2. **Moving objects to an Archive Bucket**

   To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.

   Example:

   ```sql
   INSERT INTO `my-topic`
   SELECT * FROM `my-gcp-storage-bucket:my-prefix`
   PROPERTIES (
       'post.process.action'=`MOVE`,
       'post.process.action.bucket'=`archive-bucket`,
       'post.process.action.prefix'=`processed/`
   )
   ```

   Result: objects are transferred to an archive-bucket, stored with an updated path that includes the `processed/` prefix, maintaining an organized archive structure.

## Properties

The `PROPERTIES` clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

| Name                                               | Description                                                                                                                                                                                                                                                                                                                                                                                                                                       | Type    | Available Values                 |
| -------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------------------------------- |
| read.text.mode                                     | Controls how Text content is read                                                                                                                                                                                                                                                                                                                                                                                                                 | Enum    | Regex, StartEndTag, StartEndLine |
| read.text.regex                                    | Regular Expression for Text Reading (if applicable)                                                                                                                                                                                                                                                                                                                                                                                               | String  |                                  |
| read.text.start.tag                                | Start Tag for Text Reading (if applicable)                                                                                                                                                                                                                                                                                                                                                                                                        | String  |                                  |
| read.text.end.tag                                  | End Tag for Text Reading (if applicable)                                                                                                                                                                                                                                                                                                                                                                                                          | String  |                                  |
| read.text.buffer.size                              | Text Buffer Size (for optimization)                                                                                                                                                                                                                                                                                                                                                                                                               | Int     |                                  |
| read.text.start.line                               | Start Line for Text Reading (if applicable)                                                                                                                                                                                                                                                                                                                                                                                                       | String  |                                  |
| read.text.end.line                                 | End Line for Text Reading (if applicable)                                                                                                                                                                                                                                                                                                                                                                                                         | String  |                                  |
| read.text.trim                                     | Trim Text During Reading                                                                                                                                                                                                                                                                                                                                                                                                                          | Boolean |                                  |
| store.envelope                                     | Messages are stored as “Envelope”                                                                                                                                                                                                                                                                                                                                                                                                                 | Boolean |                                  |
| post.process.action                                | Defines the action to perform on source objects after successful processing.                                                                                                                                                                                                                                                                                                                                                                      | Enum    | DELETE or MOVE                   |
| post.process.action.bucket                         | Specifies the target bucket for the `MOVE` action (required for `MOVE`).                                                                                                                                                                                                                                                                                                                                                                          | String  |                                  |
| post.process.action.prefix                         | Specifies a new prefix for the object’s location when using the `MOVE` action (required for `MOVE`).                                                                                                                                                                                                                                                                                                                                              | String  |                                  |
| post.process.action.watermark.process.late.arrival | Enable this feature to periodically scan the specified S3 bucket and path for files with timestamps older than the current watermark. If such files are detected, it performs a "touch" operation to update their last modified timestamp, allowing the connector to process them. Use this option only when there is a possibility of "late arrival" of files, which occurs when the sequence of object creation by producers is not guaranteed. | Boolean | false                            |

## Authentication

The connector offers two distinct authentication modes:

* Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
* File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
* Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.

```properties
connect.gcpstorage.gcp.auth.mode=Default
```

When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described [here](https://cloud.google.com/docs/authentication/application-default-credentials).

Here’s an example configuration for the “Credentials” mode:

```properties
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
```

And here is an example configuration using the “File” mode:

```properties
connect.gcpstorage.gcp.auth.mode=File
connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-read-credential.txt
```

Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.

## Backup and Restore <a href="#backup-and-restore" id="backup-and-restore"></a>

When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set **store.envelope** to true. This configuration ensures that the source expects the following data structure in GCP Storage:

```json
{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Value, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 0,
    "partition": 0,
    "timestamp": 0,
    "topic": "topic"
  }
}
```

When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.

### Partition Extraction <a href="#partition-extraction" id="partition-extraction"></a>

When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.

To configure the partition extractor, you can utilize the `connect.gcpstorage.source.partition.extractor.type` property, which supports two options:

* **hierarchical**: This option aligns with the default format used by the sink, topic/partition/offset.json.
* **regex**: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the `connect.gcpstorage.source.partition.extractor.regex` property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:

```bash
(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$
```

## Option Reference

| Name                                                          | Description                                                                                                                                           | Type    | Available Values                                                             | Default Value      |
| ------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | ---------------------------------------------------------------------------- | ------------------ |
| connect.gcpstorage.gcp.auth.mode                              | Specifies the authentication mode for connecting to GCP.                                                                                              | string  | "Credentials", "File" or "Default"                                           | "Default"          |
| connect.gcpstorage.gcp.credentials                            | For "auth.mode" credentials: GCP Authentication credentials string.                                                                                   | string  |                                                                              | (Empty)            |
| connect.gcpstorage.gcp.file                                   | For "auth.mode" file: Local file path for file containing GCP authentication credentials.                                                             | string  |                                                                              | (Empty)            |
| connect.gcpstorage.gcp.project.id                             | GCP Project ID.                                                                                                                                       | string  |                                                                              | (Empty)            |
| connect.gcpstorage.gcp.quota.project.id                       | GCP Quota Project ID.                                                                                                                                 | string  |                                                                              | (Empty)            |
| connect.gcpstorage.endpoint                                   | Endpoint for GCP Storage.                                                                                                                             | string  |                                                                              |                    |
| connect.gcpstorage.error.policy                               | Defines the error handling policy when errors occur during data transfer to or from GCP Storage.                                                      | string  | "NOOP," "THROW," "RETRY"                                                     | "THROW"            |
| connect.gcpstorage.max.retries                                | Sets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.                                     | int     |                                                                              | 20                 |
| connect.gcpstorage.retry.interval                             | Specifies the interval (in milliseconds) between retry attempts by the connector.                                                                     | int     |                                                                              | 60000              |
| connect.gcpstorage.http.max.retries                           | Sets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.                                                  | long    |                                                                              | 5                  |
| connect.gcpstorage.http.retry.interval                        | Specifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.                           | long    |                                                                              | 50                 |
| connect.gcpstorage.kcql                                       | Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour                                                                  | string  | \[kcql configuration]\({{< relref "#kcql-support" >}})                       |                    |
| connect.gcpstorage.source.extension.excludes                  | A comma-separated list of object extensions to exclude from the source object search.                                                                 | string  | \[object extension filtering]\({{< relref "#object-extension-filtering" >}}) |                    |
| connect.gcpstorage.source.extension.includes                  | A comma-separated list of object extensions to include in the source object search.                                                                   | string  | \[object extension filtering]\({{< relref "#object-extension-filtering" >}}) |                    |
| connect.gcpstorage.source.partition.extractor.type            | Type of Partition Extractor (Hierarchical or Regex)                                                                                                   | string  | hierarchical, regex                                                          |                    |
| connect.gcpstorage.source.partition.extractor.regex           | Regex Pattern for Partition Extraction (if applicable)                                                                                                | string  |                                                                              |                    |
| connect.gcpstorage.source.partition.search.continuous         | If set to true the connector will continuously search for new partitions.                                                                             | boolean | true, false                                                                  | true               |
| connect.gcpstorage.source.partition.search.interval           | The interval in milliseconds between searching for new partitions.                                                                                    | long    |                                                                              | 300000             |
| connect.gcpstorage.source.partition.search.excludes           | A comma-separated list of paths to exclude from the partition search.                                                                                 | string  |                                                                              | ".indexes"         |
| connect.gcpstorage.source.partition.search.recurse.levels     | Controls how many levels deep to recurse when searching for new partitions                                                                            | int     |                                                                              | 0                  |
| connect.gcpstorage.ordering,type                              | Type of ordering for the GCS object keys to ensure the processing order.                                                                              | string  | AlphaNumeric, LastModified                                                   | AlphaNumeric       |
| connect.gcpstorage.source.empty.results.backoff.initial.delay | Initial delay before retrying when no results are found.                                                                                              | long    |                                                                              | 1000 Milliseconds  |
| connect.gcpstorage.source.empty.results.backoff.max.delay     | Maximum delay before retrying when no results are found.                                                                                              | long    |                                                                              | 10000 Milliseconds |
| connect.gcpstorage.source.empty.results.backoff.multiplier    | Multiplier to apply to the delay when retrying when no results are found.                                                                             | double  |                                                                              | 2.0 Multiplier (x) |
| connect.gcpstorage.source.write.watermark.header              | Write the record with kafka headers including details of the source and line number of the file.                                                      | boolean | true, false                                                                  | false              |
| connect.gcpstorage.source.late.arrival.interval               | <p>Specify the delay interval, in seconds, for processing objects or files older than the connector watermark.<br>Available since version 11.3.0.</p> | int     |                                                                              | 300 (5 minutes)    |
