All pages
Powered by GitBook
1 of 1

Loading...

Backup & Restore

This page describes how to perform a backup and restore of data in your Kafka cluster.

The following external storage for backing up to and restoring from are:

  • AWS S3

Backup and restore are achieved using the standard connectors but enabling the Message Envelope to ensure the correct metadata is persisted.

Message Envelope

A Kafka message includes keys, values, headers, and metadata (topic, partition, offset, timestamp).

The connector wraps these messages in an "envelope", streamlining backup and restoration without relying on complex Kafka Connect transformations.

Here's how the envelope is structured:

In this format, all parts of the Kafka message are retained. This is beneficial for backup, restoration, and to run analytical queries.

The Source connector uses this format to rebuild the original Kafka message and send it to the specified topic.

Storage Formats

Anything can be stored in S3, and the connector does its best to support the major formats, offering support for:

  • AVRO

  • Parquet

  • JSON

  • CSV (including headers)

This format is decoupled from the format in Kafka. The translation from Kafka to Connect happens via the key.converter and value.converter connector properties.

Object partitioning in S3

Partitioning is crucial for organizing data and improving query speeds. In S3, this is achieved using the Object Key. By default, the connector reflects the structure of the Kafka topic it is sending to. For instance, a three-partition topic would use this configuration:

This would result in:

The connector allows for customised partitioning, which has its perks:

  • Better performance in subsequent data queries due to organized partitions.

  • Easy data management through time intervals, like year or month.

  • Keeping sensitive data in distinct partitions for tighter access controls.

  • To adjust partitioning, use the PARTITIONBY clause in the KCQL configuration. This can use the Kafka message's key, value, or headers for partitioning.

For instance, for a "sales" Kafka topic with transaction messages, the KCQL can partition data by transaction year, product type, and customer region.

The Kafka Connect S3 Sink Connector will create custom object keys in your S3 bucket that incorporate the customer ID, transaction year, product category, and customer region, resulting in a coarser partitioning strategy. For instance, an object key might look like this:

To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:

This will result in object keys like:

Organizing data into time-based intervals within custom object keys can be highly beneficial. To achieve time-based intervals with a custom object key naming, the connector supports a complementary Kafka Connect Single Message Transformer (SMT) plugin designed to streamline this process. You can find the transformer plugin and documentation .

Consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called `timestamp`. Here's the connector configuration to achieve this:

In this configuration:

  • The TimestampConverter SMT is used to convert the timestamp field in the Kafka message's value into a string value based on the specified format pattern (yyyy-MM-dd-HH). This allows us to format the timestamp to represent an hourly interval.

  • The InsertWallclock SMT incorporates the current wallclock time in the specified format (yyyy-MM-dd-HH).

  • The PARTITIONBY clause leverages the timestamp field and the wallclock header to craft the object key, providing precise control over data partitioning.

Text

  • BYTES

  • here
    {
      "key": <the message Key, which can be a primitive or a complex object>,
      "value": <the message Key, which can be a primitive or a complex object>,
      "headers": {
        "header1": "value1",
        "header2": "value2"
      },
      "metadata": {
        "offset": 821122,
        "partition": 3,
        "timestamp": 1695645345,
        "topic": "source_topic"
      }
    }
    INSERT INTO lensesioaws 
    SELECT * FROM payments 
    STOREAS AVRO 
    PROPERTIES (
     ‘store.envelope’=true
    );
    connect.s3.kcql=INSERT INTO lensesioaws SELECT * FROM payments STOREAS AVRO PROPERTIES ( ‘store.envelope’=true)
    s3://lensesioaws/payments/0/000000001234.avro
    s3://lensesioaws/payments/1/00000000555.avro
    s3://lensesioaws/payments/2/0000000014566.avro
    INSERT INTO my-s3-bucket
    SELECT *
    FROM sales 
    PARTITIONBY _key.year, _value.product_category, _headers.region
    STOREAS AVRO,
    s3://my-s3-bucket/2023/Electronics/EMEA/000000001.avro
    INSERT INTO my-s3-bucket
    SELECT *
    FROM sales
    PARTITIONBY _key.year, _value.product_category, _headers.region
    STOREAS AVRO
    PROPERTIES('partition.include.keys' = true)
    s3://my-s3-bucket/year=2023/product_category=Electronics/region=EMEA/000000001.avro
    connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` WITH_FLUSH_SIZE=1000000 WITH_FLUSH_INTERVAL=30 WITH_FLUSH_COUNT=5000
    topics=demo
    name=demo
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    transforms=insertFormattedTs,insertWallclock
    transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
    transforms.insertFormattedTs.header.name=ts
    transforms.insertFormattedTs.field=timestamp
    transforms.insertFormattedTs.target.type=string
    transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
    transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
    transforms.insertWallclock.header.name=wallclock
    transforms.insertWallclock.value.type=format
    transforms.insertWallclock.format=yyyy-MM-dd-HH
    Backing and restoring data with AWS S3