# GCP Sink Time Based Partitioning

## Partitioning by Date and Time  <a href="#partitioning-by-date-and-time" id="partitioning-by-date-and-time"></a>

This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.

## Partitioning by Data Date and Hour  <a href="#partitioning-by-data-date-and-hour" id="partitioning-by-data-date-and-hour"></a>

Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.

## Default Confluent Partitioning  <a href="#default-confluent-partitioning" id="default-confluent-partitioning"></a>

The default Confluent partitioning scheme follows the structure `<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>`. This provides a default partitioning mechanism for Kafka topics.

```
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
```

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour
```

{% endcode %}

## Partitioning by Year, Month, and Day  <a href="#partitioning-by-year-month-and-day" id="partitioning-by-year-month-and-day"></a>

Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
```

{% endcode %}

## Partitioning by Year, Month, Day, Hour, and Minute  <a href="#partitioning-by-year-month-day-hour-and-minute" id="partitioning-by-year-month-day-hour-and-minute"></a>

Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="yyyy"
transforms.partition.month.format="MM"
transforms.partition.day.format="dd"
transforms.partition.hour.format="HH"
transforms.partition.minute.format="mm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minute
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date='yyyy-MM-dd"
transforms.partition.hour.format="'time='HHmm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.time
```

{% endcode %}

## Partitioning by Year, Month, Day, and Hour  <a href="#partitioning-by-year-month-day-and-hour" id="partitioning-by-year-month-day-and-hour"></a>

This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
```

{% endcode %}

## Partitioning by Date and Hour  <a href="#partitioning-by-date-and-hour" id="partitioning-by-date-and-hour"></a>

This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'dt='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
```

{% endcode %}

***

## Partitioning by Created At Timestamp  <a href="#partitioning-by-created-at-timestamp" id="partitioning-by-created-at-timestamp"></a>

This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.

## Partitioning by Raw Creation Date  <a href="#partitioning-by-raw-creation-date" id="partitioning-by-raw-creation-date"></a>

Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Creation Timestamp  <a href="#partitioning-by-creation-timestamp" id="partitioning-by-creation-timestamp"></a>

Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation-ts='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Created At Date  <a href="#partitioning-by-created-at-date" id="partitioning-by-created-at-date"></a>

This scenario partitions data by the created at date, employing record timestamp headers for partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Created At Date (Alternate Format)  <a href="#partitioning-by-created-at-date-alternate-format" id="partitioning-by-created-at-date-alternate-format"></a>

Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyyMMddHH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'created_at='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Creation Date  <a href="#partitioning-by-creation-date" id="partitioning-by-creation-date"></a>

Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation_ds='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Data Date  <a href="#partitioning-by-data-date" id="partitioning-by-data-date"></a>

This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

***

## Partitioning by Date and Hour  <a href="#partitioning-by-date-and-hour-1" id="partitioning-by-date-and-hour-1"></a>

Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
```

{% endcode %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/tutorials/cloud-storage-examples/gcp-source-time-based-partitioning.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
