Backup & Restore

This page describes how to perform a backup and restore of data in your Kafka cluster.

The following external storage for backing up to and restoring from are:

  • AWS S3

Backup and restore are achieved using the standard connectors but enabling the Message Envelope to ensure the correct metadata is persisted.

Backing and restoring data with AWS S3

Message Envelope

A Kafka message includes keys, values, headers, and metadata (topic, partition, offset, timestamp).

The connector wraps these messages in an "envelope", streamlining backup and restoration without relying on complex Kafka Connect transformations.

Here's how the envelope is structured:

{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Key, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 821122,
    "partition": 3,
    "timestamp": 1695645345,
    "topic": "source_topic"
  }
}

In this format, all parts of the Kafka message are retained. This is beneficial for backup, restoration, and to run analytical queries.

The Source connector uses this format to rebuild the original Kafka message and send it to the specified topic.

INSERT INTO lensesioaws 
SELECT * FROM payments 
STOREAS AVRO 
PROPERTIES (
 ‘store.envelope’=true
);

Storage Formats

Anything can be stored in S3, and the connector does its best to support the major formats, offering support for:

  • AVRO

  • Parquet

  • JSON

  • CSV (including headers)

  • Text

  • BYTES

This format is decoupled from the format in Kafka. The translation from Kafka to Connect happens via the key.converter and value.converter connector properties.

Object partitioning in S3

Partitioning is crucial for organizing data and improving query speeds. In S3, this is achieved using the Object Key. By default, the connector reflects the structure of the Kafka topic it is sending to. For instance, a three-partition topic would use this configuration:

connect.s3.kcql=INSERT INTO lensesioaws SELECT * FROM payments STOREAS AVRO PROPERTIES ( ‘store.envelope’=true)

This would result in:

s3://lensesioaws/payments/0/000000001234.avro
s3://lensesioaws/payments/1/00000000555.avro
s3://lensesioaws/payments/2/0000000014566.avro

The connector allows for customised partitioning, which has its perks:

  • Better performance in subsequent data queries due to organized partitions.

  • Easy data management through time intervals, like year or month.

  • Keeping sensitive data in distinct partitions for tighter access controls.

  • To adjust partitioning, use the PARTITIONBY clause in the KCQL configuration. This can use the Kafka message's key, value, or headers for partitioning.

For instance, for a "sales" Kafka topic with transaction messages, the KCQL can partition data by transaction year, product type, and customer region.

INSERT INTO my-s3-bucket
SELECT *
FROM sales 
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO,

The Kafka Connect S3 Sink Connector will create custom object keys in your S3 bucket that incorporate the customer ID, transaction year, product category, and customer region, resulting in a coarser partitioning strategy. For instance, an object key might look like this:

s3://my-s3-bucket/2023/Electronics/EMEA/000000001.avro

To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:

INSERT INTO my-s3-bucket
SELECT *
FROM sales
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO
PROPERTIES('partition.include.keys' = true)

This will result in object keys like:

s3://my-s3-bucket/year=2023/product_category=Electronics/region=EMEA/000000001.avro

Organizing data into time-based intervals within custom object keys can be highly beneficial. To achieve time-based intervals with a custom object key naming, the connector supports a complementary Kafka Connect Single Message Transformer (SMT) plugin designed to streamline this process. You can find the transformer plugin and documentation here.

Consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called `timestamp`. Here's the connector configuration to achieve this:

connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` WITH_FLUSH_SIZE=1000000 WITH_FLUSH_INTERVAL=30 WITH_FLUSH_COUNT=5000
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH

In this configuration:

  • The TimestampConverter SMT is used to convert the timestamp field in the Kafka message's value into a string value based on the specified format pattern (yyyy-MM-dd-HH). This allows us to format the timestamp to represent an hourly interval.

  • The InsertWallclock SMT incorporates the current wallclock time in the specified format (yyyy-MM-dd-HH).

  • The PARTITIONBY clause leverages the timestamp field and the wallclock header to craft the object key, providing precise control over data partitioning.

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.