# Backup & Restore

The following external storage for backing up to and restoring from are:

* AWS S3

Backup and restore are achieved using the standard connectors but enabling the Message Envelope to ensure the correct metadata is persisted.&#x20;

{% embed url="<https://vimeo.com/874891392>" %}
Backing and restoring data with AWS S3
{% endembed %}

## Message Envelope <a href="#heading" id="heading"></a>

A Kafka message includes keys, values, headers, and metadata (topic, partition, offset, timestamp).

The connector wraps these messages in an "**envelope**", streamlining backup and restoration without relying on complex Kafka Connect transformations.

Here's how the envelope is structured:

```sql
{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Key, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 821122,
    "partition": 3,
    "timestamp": 1695645345,
    "topic": "source_topic"
  }
}
```

In this format, all parts of the Kafka message are retained. This is beneficial for backup, restoration, and to run analytical queries.

The Source connector uses this format to rebuild the original Kafka message and send it to the specified topic.

```sql
INSERT INTO lensesioaws 
SELECT * FROM payments 
STOREAS AVRO 
PROPERTIES (
 ‘store.envelope’=true
);
```

## Storage Formats <a href="#heading" id="heading"></a>

Anything can be stored in S3, and the connector does its best to support the major formats, offering support for:

* AVRO
* Parquet
* JSON
* CSV (including headers)
* Text
* BYTES

This format is decoupled from the format in Kafka. The translation from Kafka to Connect happens via the `key.converter` and `value.converter` connector properties.

## Object partitioning in S3 <a href="#heading" id="heading"></a>

Partitioning is crucial for organizing data and improving query speeds. In S3, this is achieved using the Object Key. By default, the connector reflects the structure of the Kafka topic it is sending to. For instance, a three-partition topic would use this configuration:

{% code fullWidth="false" %}

```sql
connect.s3.kcql=INSERT INTO lensesioaws SELECT * FROM payments STOREAS AVRO PROPERTIES ( ‘store.envelope’=true)
```

{% endcode %}

This would result in:

```sql
s3://lensesioaws/payments/0/000000001234.avro
s3://lensesioaws/payments/1/00000000555.avro
s3://lensesioaws/payments/2/0000000014566.avro
```

The connector allows for customised partitioning, which has its perks:

* Better performance in subsequent data queries due to organized partitions.
* Easy data management through time intervals, like year or month.
* Keeping sensitive data in distinct partitions for tighter access controls.
* To adjust partitioning, use the PARTITIONBY clause in the KCQL configuration. This can use the Kafka message's key, value, or headers for partitioning.

For instance, for a "sales" Kafka topic with transaction messages, the KCQL can partition data by transaction year, product type, and customer region.

```sql
INSERT INTO my-s3-bucket
SELECT *
FROM sales 
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO,
```

The Kafka Connect S3 Sink Connector will create custom object keys in your S3 bucket that incorporate the customer ID, transaction year, product category, and customer region, resulting in a coarser partitioning strategy. For instance, an object key might look like this:

```sql
s3://my-s3-bucket/2023/Electronics/EMEA/000000001.avro
```

To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:

```sql
INSERT INTO my-s3-bucket
SELECT *
FROM sales
PARTITIONBY _key.year, _value.product_category, _headers.region
STOREAS AVRO
PROPERTIES('partition.include.keys' = true)
```

This will result in object keys like:

```sql
s3://my-s3-bucket/year=2023/product_category=Electronics/region=EMEA/000000001.avro
```

Organizing data into time-based intervals within custom object keys can be highly beneficial. To achieve time-based intervals with a custom object key naming, the connector supports a complementary Kafka Connect Single Message Transformer (SMT) plugin designed to streamline this process. You can find the transformer plugin and documentation [here](https://github.com/lensesio/kafka-connect-smt).

Consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called \`timestamp\`. Here's the connector configuration to achieve this:

{% code fullWidth="false" %}

```sql
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` WITH_FLUSH_SIZE=1000000 WITH_FLUSH_INTERVAL=30 WITH_FLUSH_COUNT=5000
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH
```

{% endcode %}

In this configuration:

* The TimestampConverter SMT is used to convert the timestamp field in the Kafka message's value into a string value based on the specified format pattern (yyyy-MM-dd-HH). This allows us to format the timestamp to represent an hourly interval.
* The InsertWallclock SMT incorporates the current wallclock time in the specified format (yyyy-MM-dd-HH).
* The PARTITIONBY clause leverages the timestamp field and the wallclock header to craft the object key, providing precise control over data partitioning.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/connectors/tutorials/backup-and-restore.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
