Examples for GCP Sink Kafka Connector time based partitioning.
Partitioning by Date and Time
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Partitioning by Data Date and Hour
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
Default Confluent Partitioning
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>. This provides a default partitioning mechanism for Kafka topics.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="yyyy-MM-dd-HH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.year.format="'year='yyyy"transforms.partition.month.format="'month='MM"transforms.partition.day.format="'day='dd"transforms.partition.hour.format="'hour='HH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour
Partitioning by Year, Month, and Day
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.year.format="'year='yyyy"transforms.partition.month.format="'month='MM"transforms.partition.day.format="'day='dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.year.format="'year='yyyy"transforms.partition.month.format="'month='MM"transforms.partition.day.format="'day='dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
Partitioning by Year, Month, Day, Hour, and Minute
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.year.format="yyyy"transforms.partition.month.format="MM"transforms.partition.day.format="dd"transforms.partition.hour.format="HH"transforms.partition.minute.format="mm"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minute
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'date='yyyy-MM-dd"transforms.partition.hour.format="'time='HHmm"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.time
Partitioning by Year, Month, Day, and Hour
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'data_date='yyyy-MM-dd"transforms.partition.hour.format="'hour='HH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
Partitioning by Date and Hour
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'dt='yyyy-MM-dd"transforms.partition.hour.format="'hour='HH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
Partitioning by Created At Timestamp
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Partitioning by Raw Creation Date
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Partitioning by Creation Timestamp
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'creation-ts='yyyy-MM-dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Partitioning by Created At Date
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'createdAt='yyyy-MM-dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Partitioning by Created At Date (Alternate Format)
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'createdAt='yyyyMMddHH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'created_at='yyyy-MM-dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Partitioning by Creation Date
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'creation_ds='yyyy-MM-dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'data_date='yyyy-MM-dd"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'date_hour='yyyy-MM-dd-HH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Partitioning by Data Date
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="'data_date='yyyy-MM-dd-HH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Partitioning by Date and Hour
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
transforms=partitiontransforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaderstransforms.partition.date.format="yyyy-MM-dd-HH"connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X