AWS S3


This Kafka Connect sink connector facilitates the seamless transfer of records from Kafka to AWS S3 Buckets. It offers robust support for various data formats, including AVRO, Parquet, JSON, CSV, and Text, making it a versatile choice for data storage. Additionally, it ensures the reliability of data transfer with built-in support for exactly-once semantics.

Connector Properties 

NameDescriptionTypeAvailable ValuesDefault Value
connect.s3.aws.auth.modeSpecifies the AWS authentication mode for connecting to S3.string“Credentials,” “Default”“Default”
connect.s3.aws.access.keyThe AWS Access Key used for authentication.string(Empty)
connect.s3.aws.secret.keyThe AWS Secret Key used for authentication.string(Empty)
connect.s3.aws.regionThe AWS Region where the S3 bucket is located.string(Empty)
connect.s3.pool.max.connectionsSpecifies the maximum number of connections allowed in the AWS Client’s HTTP connection pool when interacting with S3.int-1 (undefined)50
connect.s3.custom.endpointAllows for the specification of a custom S3 endpoint URL if needed.string(Empty)
connect.s3.vhost.bucketEnables the use of Vhost Buckets for S3 connections. Always set to true when custom endpoints are used.booleantrue, falsefalse
connect.s3.error.policyDefines the error handling policy when errors occur during data transfer to or from S3.string“NOOP,” “THROW,” “RETRY”“THROW”
connect.s3.max.retriesSets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.int20
connect.s3.retry.intervalSpecifies the interval (in milliseconds) between retry attempts by the connector.int60000
connect.s3.http.max.retriesSets the maximum number of retries for the underlying HTTP client when interacting with S3.long5
connect.s3.http.retry.intervalSpecifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.long50
connect.s3.local.tmp.directoryEnables the use of a local folder as a staging area for data transfer operations.string(Empty)
connect.s3.kcqlA SQL-like configuration that defines the behavior of the connector. Refer to the KCQL section below for details.string(Empty)
connect.s3.compression.codecSets the Parquet compression codec to be used when writing data to S3.string“UNCOMPRESSED,” “SNAPPY,” “GZIP,” “LZ0,” “LZ4,” “BROTLI,” “BZIP2,” “ZSTD,” “DEFLATE,” “XZ”“UNCOMPRESSED”
connect.s3.compression.levelSets the compression level when compression is enabled for data transfer to S3.int1-9(Empty)
connect.s3.seek.max.filesSpecifies the maximum threshold for the number of files the connector uses to ensure exactly-once processing of data.int5
connect.s3.indexes.nameConfigure the indexes root directory for this connector.string“.indexes”
connect.s3.exactly.once.enableBy setting to ‘false’, disable exactly-once semantics, opting instead for Kafka Connect’s native at-least-once offset managementbooleantrue, falsetrue

KCQL (Kafka Connect Query Language) 

The connector uses KCQL to map topics to S3 buckets and paths. The full KCQL syntax is:

INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]

Escape characters 

Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, an incoming Kafka message stored as Json can use fields containing .:

{
  ...
  "a.b": "value",
  ...
}

In this case you can use the following KCQL statement:

INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`

Properties 

The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

NameDescriptionTypeAvailable ValuesDefault Value
padding.typeSpecifies the type of padding to be applied.LeftPad, RightPad, NoOpLeftPad, RightPad, NoOpLeftPad
padding.charDefines the character used for padding.Char‘0’
padding.length.partitionSets the padding length for the partition.Int0
padding.length.offsetSets the padding length for the offset.Int12
partition.include.keysSpecifies whether partition keys are included.Booleanfalse
Default (Custom Partitioning): true
store.envelopeIndicates whether to store the entire Kafka messageBoolean
store.envelope.keyIndicates whether to store the envelope’s key.Boolean
store.envelope.headersIndicates whether to store the envelope’s headers.Boolean
store.envelope.valueIndicates whether to store the envelope’s value.Boolean
store.envelope.metadataIndicates whether to store the envelope’s metadata.Boolean
flush.sizeSpecifies the size (in bytes) for the flush operation.Long500000000 (500MB)
flush.countSpecifies the number of records for the flush operation.Int50000
flush.intervalSpecifies the interval (in seconds) for the flush operation.Long3600 (1 hour)

The sink connector enhances performance by employing file padding in the output files. This strategy proves particularly advantageous when utilizing the Source connector to restore data. File padding ensures that files maintain a lexicographical order, streamlining the process for the Source connector.

Target Bucket and Path 

The target bucket and path are specified in the INSERT INTO clause. The path is optional and if not specified, the connector will write to the root of the bucket and append the topic name to the path.

Here are a few examples:

INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testbucket SELECT * FROM topicA;
INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;

SQL projection 

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all fields from Kafka exactly as they are.

Source Topic 

The source topic is defined within the FROM clause. To avoid runtime errors, it’s crucial to configure either the topics or topics.regex property in the connector and ensure proper mapping to the KCQL statements.

A notable exception is supported when the FROM clause is set to *. In this scenario, the connector will automatically map all topics to the KCQL statement and apply the same behavior to each of them.

Object Key 

The object key serves as the filename used to store data in S3. There are two options for configuring the object key:

  • Default: The object key is automatically generated by the connector and follows the Kafka topic-partition structure. The format is $bucket/[$prefix]/$topic/$partition/offset.extension. The extension is determined by the chosen storage format.
  • Custom: The object key is driven by the PARTITIONBY clause. The format is either $bucket/[$prefix]/$topic/customKey1=customValue1/customKey2=customValue2/topic(partition_offset).extension (AWS Athena naming style mimicking Hive-like data partitioning) or `$bucket/[$prefix]/customValue/topic(partition_offset).ext. The extension is determined by the selected storage format.

Custom keys and values can be extracted from the Kafka message key, message value, or message headers, as long as the headers are of types that can be converted to strings. There is no fixed limit to the number of elements that can form the object key, but you should be aware of AWS S3 key length restrictions.

To extract fields from the message values, simply use the field names in the PARTITIONBY clause. For example:

PARTITIONBY fieldA, fieldB

However, note that the message fields must be of primitive types (e.g., string, int, long) to be used for partitioning.

You can also use the entire message key as long as it can be coerced into a primitive type:

PARTITIONBY _key

In cases where the Kafka message Key is not a primitive but a complex object, you can use individual fields within the message Key to create the S3 object key name:

PARTITIONBY _key.fieldA, _key.fieldB

Kafka message headers can also be used in the S3 object key definition, provided the header values are of primitive types easily convertible to strings:

PARTITIONBY _header.<header_key1>[, _header.<header_key2>]

Customizing the object key can leverage various components of the Kafka message. For example:

PARTITIONBY fieldA, _key.fieldB, _headers.fieldC

This flexibility allows you to tailor the object key to your specific needs, extracting meaningful information from Kafka messages to structure S3 object keys effectively.

To enable Athena-like partitioning, use the following syntax:

INSERT INTO $bucket[:$prefix]
SELECT * FROM $topic
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
STOREAS `AVRO`
PROPERTIES (
    'partition.include.keys'=true,
)

Rolling window 

Storing data in Amazon S3 and partitioning it by time is a common practice in data management. For instance, you may want to organize your S3 data in hourly intervals. This partitioning can be seamlessly achieved using the PARTITIONBY clause in combination with specifying the relevant time field. However, it’s worth noting that the time field typically doesn’t adjust automatically.

To address this, we offer a Kafka Connect Single Message Transformer (SMT) designed to streamline this process. You can find the transformer plugin and documentation here.

Let’s consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called timestamp. Here’s the connector configuration to achieve this:

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=insert into lensesio:demo select * from demo PARTITIONBY _value.metadata_id, _value.customer_id, _header.ts, _header.wallclock STOREAS `JSON` PROPERTIES ('flush.size'=1000000, 'flush.interval'=30, 'flush.count'=5000)
topics=demo
name=demo
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=insertFormattedTs,insertWallclock
transforms.insertFormattedTs.type=io.lenses.connect.smt.header.TimestampConverter
transforms.insertFormattedTs.header.name=ts
transforms.insertFormattedTs.field=timestamp
transforms.insertFormattedTs.target.type=string
transforms.insertFormattedTs.format.to.pattern=yyyy-MM-dd-HH
transforms.insertWallclock.type=io.lenses.connect.smt.header.InsertWallclock
transforms.insertWallclock.header.name=wallclock
transforms.insertWallclock.value.type=format
transforms.insertWallclock.format=yyyy-MM-dd-HH

In this example, the incoming Kafka message’s Value content includes a field called timestamp, represented as a long value indicating the epoch time in milliseconds. The TimestampConverter SMT will expertly convert this into a string value according to the format specified in the format.to.pattern property. Additionally, the insertWallclock SMT will incorporate the current wallclock time in the format you specify in the format property.

The PARTITIONBY clause then leverages both the timestamp field and the wallclock header to craft the object key, providing you with precise control over data partitioning.

Data Storage Format 

While the STOREAS clause is optional, it plays a pivotal role in determining the storage format within AWS S3. It’s crucial to understand that this format is entirely independent of the data format stored in Kafka. The connector maintains its neutrality towards the storage format at the topic level and relies on the key.converter and value.converter settings to interpret the data.

Supported storage formats encompass:

  • AVRO
  • Parquet
  • JSON
  • CSV (including headers)
  • Text
  • BYTES

Opting for BYTES ensures that each record is stored in its own separate file. This feature proves particularly valuable for scenarios involving the storage of images or other binary data in S3. For cases where you prefer to consolidate multiple records into a single binary file, AVRO or Parquet are the recommended choices.

By default, the connector exclusively stores the Kafka message value. However, you can expand storage to encompass the entire message, including the key, headers, and metadata, by configuring the store.envelope property as true. This property operates as a boolean switch, with the default value being false. When the envelope is enabled, the data structure follows this format:

{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Key, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 0,
    "partition": 0,
    "timestamp": 0,
    "topic": "topic"
  }
}

Utilizing the envelope is particularly advantageous in scenarios such as backup and restore or replication, where comprehensive storage of the entire message in S3 is desired.

Examples 

Storing the message Value Avro data as Parquet in S3:

  ...
  connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
  value.converter=io.confluent.connect.avro.AvroConverter
  value.converter.schema.registry.url=http://localhost:8081
  key.converter=org.apache.kafka.connect.storage.StringConverter
  ...

The converter also facilitates seamless JSON to AVRO/Parquet conversion, eliminating the need for an additional processing step before the data is stored in S3.

  ...
  connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `PARQUET` 
  value.converter=org.apache.kafka.connect.json.JsonConverter
  key.converter=org.apache.kafka.connect.storage.StringConverter
  ...

Enabling the full message stored as JSON in S3:

  ...
  connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `JSON` PROPERTIES('store.envelope'=true)
  value.converter=org.apache.kafka.connect.json.JsonConverter
  key.converter=org.apache.kafka.connect.storage.StringConverter
  ...

Enabling the full message stored as AVRO in S3:

  ...
  connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
  value.converter=io.confluent.connect.avro.AvroConverter
  value.converter.schema.registry.url=http://localhost:8081
  key.converter=org.apache.kafka.connect.storage.StringConverter
  ...

If the restore (see the S3 Source documentation) happens on the same cluster, then the most performant way is to use the ByteConverter for both Key and Value and store as AVRO or Parquet:

  ...
  connect.s3.kcql=INSERT INTO lensesioaws:car_speed SELECT * FROM car_speed_events STOREAS `AVRO` PROPERTIES('store.envelope'=true)
  value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
  key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
  ...

Flush Options 

The connector offers three distinct flush options for data management:

  • Flush by Count - triggers a file flush after a specified number of records have been written to it.
  • Flush by Size - initiates a file flush once a predetermined size (in bytes) has been attained.
  • Flush by Interval - enforces a file flush after a defined time interval (in seconds).

It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.

Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.

The flush options are configured using the ‘flush.count’, ‘flush.size’, and ‘flush.interval’ properties. The settings are optional and if not specified the defaults are:

  • flush.count = 50_000
  • flush.size = 500000000 (500MB)
  • flush.interval = 3_600 (1 hour)

Flushing By Interval 

The next flush time is calculated based on the time the previous flush completed (the last modified time of the file written to S3). Therefore, by design, the sink connector’s behaviour will have a slight drift based on the time it takes to flush records and whether records are present or not. If Kafka Connect makes no calls to put records, the logic for flushing won’t be executed. This ensures a more consistent number of records per file.

sink commit.png

AVRO and Parquet Compression 

AVRO and Parquet offer the capability to compress files as they are written. The S3 Sink connector provides advanced users with the flexibility to configure compression options. Here are the available options for the connect.s3.compression.codec, along with indications of their support by Avro and Parquet writers:

CompressionAvro SupportAvro (requires Level)Parquet Support
UNCOMPRESSED
SNAPPY
GZIP
LZ0
LZ4
BROTLI
BZIP2
ZSTD⚙️
DEFLATE⚙️
XZ⚙️

Please note that not all compression libraries are bundled with the S3 connector. Therefore, you may need to manually add certain libraries to the classpath to ensure they function correctly.

Auth Mode 

The connector offers two distinct authentication modes:

  • Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.
  • Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.

When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

Here’s an example configuration for the “Credentials” mode:

...
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=$AWS_ACCESS_KEY
connect.s3.aws.secret.key=$AWS_SECRET_KEY
...

For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers here. This approach ensures robust security practices while handling access credentials.

Error polices 

Please refer to the information provided in the Error polices section for detailed guidance.

Storage to output matrix 

Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

S3 Storage FormatKafka Output FormatRestore or replicate clusterAnalyticsSink ConverterSource Converter
JSONSTRINGSame,OtherYes, NoStringConverterStringConverter
AVRO,ParquetSTRINGSame,OtherYesStringConverterStringConverter
AVRO,ParquetSTRINGSame,OtherNoByteArrayConverterByteArrayConverter
JSONJSONSame,OtherYesJsonConverterStringConverter
JSONJSONSame,OtherNoStringConverterStringConverter
AVRO,ParquetJSONSame,OtherYes,NoJsonConverterJsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSONBYTESSame,OtherYes,NoByteArrayConverterByteArrayConverter
AVRO,ParquetAVROSameYesAvro Converter( Glue, Confluent)Avro Converter( Glue, Confluent)
AVRO,ParquetAVROSameNoByteArrayConverterByteArrayConverter
AVRO,ParquetAVROOtherYes,NoAvro Converter( Glue, Confluent)Avro Converter( Glue, Confluent)
AVRO,ParquetProtobufSameYesProtobuf Converter( Glue, Confluent)Protobuf Converter( Glue, Confluent)
AVRO,ParquetProtobufSameNoByteArrayConverterByteArrayConverter
AVRO,ParquetProtobufOtherYes,NoProtobuf Converter( Glue, Confluent)Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSONOtherSame, OtherYes,NoByteArrayConverterByteArrayConverter

Adapt the key.converter and value.converter properties accordingly to the table above.

Indexes Directory 

The connector uses the concept of index files that it writes to in order to store information about the latest offsets for Kafka topics and partitions as they are being processed. This allows the connector to quickly resume from the correct position when restarting and provides flexibility in naming the index files.

By default, the root directory for these index files is named .indexes for all connectors. However, each connector will create and store its index files within its own subdirectory inside this .indexes directory.

You can configure the root directory for these index files using the property connect.s3.indexes.name. This property specifies the path from the root of the S3 bucket. Note that even if you configure this property, the connector will still create a subdirectory within the specified root directory.

Examples 

Index Name (connect.s3.indexes.name)Resulting Indexes Directory StructureDescription
.indexes (default).indexes/<connector_name>/The default setup, where each connector uses its own subdirectory within .indexes.
custom-indexescustom-indexes/<connector_name>/Custom root directory custom-indexes, with a subdirectory for each connector.
indexes/s3-connector-logsindexes/s3-connector-logs/<connector_name>/Uses a custom subdirectory s3-connector-logs within indexes, with a subdirectory for each connector.
logs/indexeslogs/indexes/<connector_name>/Indexes are stored under logs/indexes, with a subdirectory for each connector.
--
Last modified: September 15, 2024