A Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets.

KCQL support 

The following KCQL is supported:

INSERT INTO bucketAddress:pathPrefix 
FROM kafka-topic
[PARTITIONBY (partition[, partition] ...)]
[STOREAS storage_format]
[WITH_FLUSH_SIZE flush_size]
[WITH_FLUSH_INTERVAL flush_interval]
[WITH_FLUSH_COUNT flush_count]


-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA

-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket as AVRO with
-- a maximum of 5000 records per file

-- Insert mode, select all fields from "topic-with-headers" and 
-- write underneath "headerpartitioningdemo" prefix on bucket
-- "kafka-connect-aws-s3-test" partitioning files by country 
-- code and facilityNum data in the message header 
INSERT INTO kafka-connect-aws-s3-test:headerpartitioningdemo SELECT * FROM topic-with-headers PARTITIONBY _header.facilityCountryCode, _header.facilityNum STOREAS `TEXT` WITH_FLUSH_COUNT = 100


The connector accepts messages containing AVRO, text, binary byte information from Kafka Connect and can output it to a number of different formats on Amazon S3.


An example configuration is provided:

name=S3SinkConnectorS3 # this can be anything
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000 

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.

Auth Mode configuration 

2 Authentication modes are available:


ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.


In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.


The credentials will be discovered through the default chain, in this order:

  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  • Java System Properties - aws.accessKeyId and aws.secretKey
  • Web Identity Token credentials from the environment or container
  • Credential profiles file at the default location (~/.aws/credentials)
  • EC2 Credentials delivered through the Amazon EC2 container service
  • Instance profile credentials delivered through the Amazon EC2 metadata service

The full details of the default chain are available on S3 Documentation

Format configuration 

Format configuration is provided by kcql.

The options for json, avro and parquet will look like the below:

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `JSON`

The options for the formats are case-insensitive, but they are presented here in the most readable form.

JSON Output Format Configuration 

Using JSON as an output format allows you to convert the complex AVRO structure of the message to a simpler schemaless Json format on output.

Avro Output Format Configuration 

Using Avro as the output format allows you to output the Avro message.

Parquet Output Format Configuration 

Using Parquet as the output format allows you to output the Avro message to a file readable by a parquet reader, including schemas.

STOREAS `Parquet`
Text Output Format Configuration 

If the incoming kafka message contains text only and this is to be pushed through to the S3 sink as is, then this option may be desired.


It will may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.

CSV Output Format Configuration 

This converts the fields of the Avro message to string values to be written out to a CSV file. There are 2 options for CSV format, to write CSV files with the column headers or without.

STOREAS `CSV_WithHeaders`
Byte(Binary) Output Format Configuration 

Bytes can be written from the message key and/or the message value depending on which option is configured.

The key only/ value only options can be useful for, as an example, stitching together multiple parts of a binary file.

The content sizes are output first to an (8-byte) long.

In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options


Please see the KCQL options available and the results of these configurations:

OptionKCQL ConfigurationRecords written to file as
Key and Value (With Sizes)STOREAS Bytes_KeyAndValueWithSizesLong Long Bytes Bytes
Key (With Size)STOREAS Bytes_KeyWithSizeLong Bytes
Value (With Size)STOREAS Bytes_ValueWithSizeLong Bytes
Key OnlySTOREAS Bytes_KeyOnlyBytes
Value OnlySTOREAS Bytes_ValueOnlyBytes

Flush configuration 

Flush configuration is provided by kcql


The flush occurs after the configured number of records written to the sink.

For example, if you want a file written for every record, you would set the FLUSH_COUNT to 1. If you want a file written for each 10,000 records, then you would set the FLUSH_COUNT to 10000

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 1

The flush occurs after the configured size in bytes is exceeded.

For example, to flush after each 10000 bytes written to a file, you would set the FLUSH_SIZE to 10000.

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_SIZE = 10000

The flush occurs after the configured interval.

For example, to roll over to a new file after each 10 minutes, you would set the FLUSH_INTERVAL to 600 (10 minutes * 60 seconds)

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_INTERVAL = 600

Partitioning Options 

There are 2 options for grouping (partitioning) the output files.

Default Partitioning 

S3 Bucket Layout 

In your S3 bucket the sink files will be stored as


Where .ext is the appropriate file extension.

(Please note: the prefix must not be divided into subpaths.)

Custom Partitioning 

S3 Bucket Layout 

This allows you to store the sink files in your S3 bucket as




The custom keys and values can be taken from the kafka message key, from the value record, or the message headers (supporting string-coaxable headers only).

Again, .ext is the appropriate file extension.

(Please note: the prefix must not be divided into subpaths.)

Configuring Custom Partitioning 

The number of partitions you may configure on your sink is unbounded but bear in mind restrictions on AWS S3 key lengths.

Partitions from Message Values 

To pull fields from the message values, just use the name of the field from the Avro message.

The fields from the message must be primitive types (string, int, long, etc) in order to partition by them.

Add this to your KCQL string:

PARTITIONBY fieldA[,fieldB]
Partitions from Message Keys 

It is possible to partition by the entire message key, as long as the key is coercible into a primitive type:


Where the Kafka message key is not a primitive but a complex Avro object, it is possible to partition by individual fields within the key.

PARTITIONBY _key.fieldA[, _key.fieldB]
Partitions from Message Headers 

Kafka message headers may be used for partitioning. In this case the header must contain a primitive type easily coercible to a String type.

PARTITIONBY _header.<header_key1>[,_header.<header_key2>]
Mixing Partition Types 

The above partition types can be mixed to configure advanced partitioning.

For example

PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
Partitioning by Date 

The formats can be specified using the preconfigured strings from Oracle&rsquo;s Java Documentation

An example of the KCQL configuration:

PARTITIONBY _date.uuuu,_date.LL,_date.dd WITHPARTITIONER=Values

This will partition by year/month/day for example 2020/10/25

Configuring partition display 

Kafka payload support 

Dependent on the format required, this sink supports the following Kafka payloads:

  • AVRO (Structs, Maps, Arrays, Primitives). *
  • Text (including any other primitives and string based formats including Json, XML **)
  • Bytes (including any binary formats)

* Schema required when writing to Avro and Parquet formats.

** However you will not be able to partition by Json or XML properties as Text message content is not parsed.

Output Formats 

csvCSV, CSV_WithHeaders
byesBytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly

See connect payloads for more information.

Error polices 

The connector supports Error polices .


Preparing the target system 

The S3 Sink Connector requires AWS access keys and a bucket to be set up prior to running.

Configuring access keys 

The S3 connector requires AWS access keys to be able to perform actions in AWS.

You can configure these through AWS console or CLI tools.

Please see AWS documentation for more details

Creating a target bucket 

Please ensure you use sensible selections for security and access options. More information on the command at AWS&rsquo;s documentation .

aws create-bucket –bucket test-kafka-connect-bucket

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select S3 as the sink and paste the following:

    name=S3SinkConnectorS3 # this can be anything
    connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000 

To start the connector without using Lenses, log into the fastdatadev container:

docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create aws-s3-sink < connector.properties

Wait a for the connector to start and check its running:

connect-cli status aws-s3-sink

Inserting test data 

In the to fastdata container start the kafka producer shell:

kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:

{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}

Check for data in S3 

Use the aws console. Alternatively on the command line run

aws ls s3://mybucket/mykey --recursive 

Clean up 

Bring down the stack:

docker-compose down

Local File Writing 

The connector builds complete files locally before uploading in one operation in the commit.

To optionally supply a directory to write the files to locally. If none is supplied and BuildLocal mode is used, then a directory will be created in your system temporary directory (eg /tmp)


Files are currently limited to 5GB. This can be addressed in future if it is required.

Error handling 

Various properties for managing the error handling are supplied.


Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.s3.max.retries. All errors will be logged automatically, even if the code swallows them.


The maximum number of times to try the write again.


The time in milliseconds between retries.


Number of times to retry the http request, in the case of a resolvable error on the server side.


If greater than zero, used to determine the delay after which to retry the http request in milliseconds. Based on an exponential backoff algorithm. Support for optimal sink offset seek algorithm.

Seek Configuration 

The connector has been upgraded to a more efficient offset seek algorithm. It does this by keeping new index files within the S3 bucket.

Process Explanation 

Writing - At Commit Time 

  1. An index file is written to .indexes/connectorName/topic/partition/offset, containing only the path of the target file that will shortly be written.
  2. The target file is written.
  3. Clean up - retrieve all files for the given partition, if there are more than one delete all but the last.

Reading - At Assignment 

  1. Retrieve all index files for partition and the file contents.
  2. Check for the existence of the files referenced in the index files.
  3. Delete any that point to non-existent files.
  4. Latest offset will be the file remaining.
  5. If no file remains, and the optional flag connect.s3.seek.migration.enabled is set, an upgrade will be performed:
    • falls back to the previous (inefficient) algorithm, but ensure clear logging.
    • once the correct offset has been calculated, write it to the path. Therefore if the sink is restarted it will not have to reperform the expensive operation to calculate the offset again.


The default is 5.

This specifies the maximum number of index files to keep per topic/partition before raising an error.

If the index files keep accumulating then it means there is an error in the cleanup so the sink cannot continue.


The default is false.

If you are upgrading a connector from a previous version of the sink, then you will need to specify this property to enable migration.

This option is intended for one-time upgrade usage and will be deprecated in future releases of the connector.

On the first run of any connector then the old slow offset seek will be performed, however the index files will be written so performance will recover after the first offset seek.


NameDescriptionTypeAvailable ValuesDefault Value
connect.s3.aws.clientClient LibrarystringAWS, JCLOUDSJCLOUDS
connect.s3.aws.regionRegion (Required for AWS Client Library)stringAWS Regions
connect.s3.aws.access.keyAccess Keystring
connect.s3.aws.secret.keySecret Keystring
connect.s3.aws.auth.modeAuth ModestringCredentials, DefaultDefault
connect.s3.custom.endpointCustom Endpointstring
connect.s3.vhost.bucketEnable Vhost Bucketsbooleantrue, falsefalse
connect.s3.error.policyError PolicystringNOOP, THROW, RETRYTHROW
connect.s3.max.retriesMaximum Retriesint20
connect.s3.retry.intervalRetry Intervalint60000
connect.s3.http.max.retriesHTTP Maximum Retrieslong5
connect.s3.http.retry.intervalHTTP Retry Interval (Exponential Backoff)long50
connect.s3.local.tmp.directoryLocal Staging Areastring
connect.s3.kcqlKCQL Stringstringkcql configuration
connect.s3.seek.max.filesMax Index File Error Thresholdint5
connect.s3.seek.migration.enabledOne-time Connector Upgrade from Legacy Seekbooleantrue, falsefalse