# GCP Sink Time Based Partitioning

## Partitioning by Date and Time  <a href="#partitioning-by-date-and-time" id="partitioning-by-date-and-time"></a>

This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.

## Partitioning by Data Date and Hour  <a href="#partitioning-by-data-date-and-hour" id="partitioning-by-data-date-and-hour"></a>

Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.

## Default Confluent Partitioning  <a href="#default-confluent-partitioning" id="default-confluent-partitioning"></a>

The default Confluent partitioning scheme follows the structure `<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>`. This provides a default partitioning mechanism for Kafka topics.

```
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
```

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour
```

{% endcode %}

## Partitioning by Year, Month, and Day  <a href="#partitioning-by-year-month-and-day" id="partitioning-by-year-month-and-day"></a>

Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
```

{% endcode %}

## Partitioning by Year, Month, Day, Hour, and Minute  <a href="#partitioning-by-year-month-day-hour-and-minute" id="partitioning-by-year-month-day-hour-and-minute"></a>

Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="yyyy"
transforms.partition.month.format="MM"
transforms.partition.day.format="dd"
transforms.partition.hour.format="HH"
transforms.partition.minute.format="mm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minute
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date='yyyy-MM-dd"
transforms.partition.hour.format="'time='HHmm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.time
```

{% endcode %}

## Partitioning by Year, Month, Day, and Hour  <a href="#partitioning-by-year-month-day-and-hour" id="partitioning-by-year-month-day-and-hour"></a>

This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
```

{% endcode %}

## Partitioning by Date and Hour  <a href="#partitioning-by-date-and-hour" id="partitioning-by-date-and-hour"></a>

This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'dt='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
```

{% endcode %}

***

## Partitioning by Created At Timestamp  <a href="#partitioning-by-created-at-timestamp" id="partitioning-by-created-at-timestamp"></a>

This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.

## Partitioning by Raw Creation Date  <a href="#partitioning-by-raw-creation-date" id="partitioning-by-raw-creation-date"></a>

Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Creation Timestamp  <a href="#partitioning-by-creation-timestamp" id="partitioning-by-creation-timestamp"></a>

Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation-ts='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Created At Date  <a href="#partitioning-by-created-at-date" id="partitioning-by-created-at-date"></a>

This scenario partitions data by the created at date, employing record timestamp headers for partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Created At Date (Alternate Format)  <a href="#partitioning-by-created-at-date-alternate-format" id="partitioning-by-created-at-date-alternate-format"></a>

Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyyMMddHH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'created_at='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Creation Date  <a href="#partitioning-by-creation-date" id="partitioning-by-creation-date"></a>

Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation_ds='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

## Partitioning by Data Date  <a href="#partitioning-by-data-date" id="partitioning-by-data-date"></a>

This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
```

{% endcode %}

***

## Partitioning by Date and Hour  <a href="#partitioning-by-date-and-hour-1" id="partitioning-by-date-and-hour-1"></a>

Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.

{% code fullWidth="false" %}

```properties
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
```

{% endcode %}
