GCP Storage Source Connector Time-Based Partitioning Examples


Partitioning by Date and Time 

These scenarios partition data by date and time, employing record timestamp headers to enable partitioning based on these time components.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour

Partitioning by Year, Month, and Day 

Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="'year='yyyy"
transforms.partition.month.format="'month='MM"
transforms.partition.day.format="'day='dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day

Partitioning by Year, Month, Day, Hour, and Minute 

Extending the previous scenarios, these partition data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.year.format="yyyy"
transforms.partition.month.format="MM"
transforms.partition.day.format="dd"
transforms.partition.hour.format="HH"
transforms.partition.minute.format="mm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minute
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date='yyyy-MM-dd"
transforms.partition.hour.format="'time='HHmm"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.time

Partitioning by Year, Month, Day, and Hour 

This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour

Partitioning by Date and Hour 

This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'dt='yyyy-MM-dd"
transforms.partition.hour.format="'hour='HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour

Partitioning by Raw Creation Date 

Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.

#"‘raw_cre_dt’=YYYY’-‘MM’-‘dd"

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date

Partitioning by Creation Timestamp 

Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.

#"‘creation-ts’=YYYY-MM-dd"

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation-ts='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date

Partitioning by Created At Date 

This scenario partitions data by the created at date, employing record timestamp headers for partitioning.

#"‘createdAt’=YYYY’-‘MM’-‘dd"

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date

Partitioning by Created At Date (Alternate Format) 

Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'createdAt='yyyyMMddHH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'created_at='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date

Partitioning by Creation Date 

Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'creation_ds='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date

Partitioning by Data Date 

Data is partitioned by data date, utilizing record timestamp headers for partitioning based on these time components.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date

Partitioning by Data Date and Hour 

Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="'data_date='yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date

Partitioning by Date and Hour 

Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.

transforms=partition
transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders
transforms.partition.date.format="yyyy-MM-dd-HH"
connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X

Default Confluent Partitioning 

The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>. This provides a default partitioning mechanism for Kafka topics.

<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
--
Last modified: September 15, 2024