A Kafka Connect Source connector for writing records from Kafka to AWS S3 Buckets.

KCQL support 

The following KCQL is supported:

INSERT INTO kafka-topic 
FROM bucketAddress:pathPrefix
[PARTITIONBY (partition[, partition] ...)]
[STOREAS storage_format]
[LIMIT limit]


-- Insert mode, select all fields from the json files
-- within the bucket and path given on testS3Bucket
-- using the default number of records
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToWriteTo

-- Insert mode, select all fields from the avro files
-- within the bucket and path given on testS3Bucket
-- retrieving 5000 records on each poll from Kafka 
-- Connect
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToWriteTo STOREAS `AVRO` LIMIT = 5000

-- Insert mode, select the content from the text 
-- files under the bucket (kafka-connect-aws-s3-test)
-- and path (headerpartitioningdemo).  The files as 
-- stored on S3 are partitioned by country 
-- code and facilityNum data
INSERT INTO topicA SELECT * FROM kafka-connect-aws-s3-test:headerpartitioningdemo PARTITIONBY _header.facilityCountryCode, _header.facilityNum STOREAS `TEXT` WITH_FLUSH_COUNT = 100


The connector reads files in AVRO, Parquet, CSV, text, json, or binary/byte information from an S3 bucket into Kafka connect.


An example configuration is provided:

name=S3SourceConnectorParquet # this can be anything
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.

Auth Mode configuration 

2 Authentication modes are available:


ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.


In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.


The credentials will be discovered through the default chain, in this order:

  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  • Java System Properties - aws.accessKeyId and aws.secretKey
  • Web Identity Token credentials from the environment or container
  • Credential profiles file at the default location (~/.aws/credentials)
  • EC2 Credentials delivered through the Amazon EC2 container service
  • Instance profile credentials delivered through the Amazon EC2 metadata service

The full details of the default chain are available on S3 Documentation

Format configuration 

Format configuration is provided by kcql.

The options for json, avro and parquet will look like the below:

connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `JSON`

The options for the formats are case-insensitive, but they are presented here in the most readable form.

JSON Input Format Configuration 

Using JSON as an input format allows you to read in files containing JSON content (delimited by new lines), line by line.


Please note: The JSON is not parsed by the S3 Source connector. There is no difference in handling between Json and Text by the S3 Source connector.

Avro Input Format Configuration 

Using Avro as the input format allows you to read the Avro-stored messages on S3 back into Kafka’s native format.


It may also be necessary to configure the message converter:

Parquet Input Format Configuration 

Using Parquet as the input format allows you to read parquet files stored on S3, importing the Avro schemas and values.

STOREAS `Parquet`

It may also be necessary to configure the message converter:

Text Input Format Configuration 

If the source files on S3 consist of files containing lines of text, then using the text input format may be desired.


It may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.

CSV Input Format Configuration 

This reads CSV files written to S3 into a string to be written back to the Kafka source. There are 2 options for CSV format, if WithHeaders is included then the first row is skipped.

STOREAS `CSV_WithHeaders`

Please note there is little distinction between the handling of CSV and handling of TEXT (with the exception that the header row can be skipped). The CSV is not parsed within the connector.

Byte(Binary) Input Format Configuration 

Bytes can be read back in from S3 and back into message keys/values, depending on how the data was written to the source.

This can be used for reading back in a messages containing binary data that were written out using the s3 source, or alternatively reading binary files to be loaded onto a Kafka queue.

Please see the KCQL options available and the results of these configurations:

OptionKCQL ConfigurationRecords read from file as
Key and Value (With Sizes)STOREAS Bytes_KeyAndValueWithSizesLong Long Bytes Bytes
Key (With Size)STOREAS Bytes_KeyWithSizeLong Bytes
Value (With Size)STOREAS Bytes_ValueWithSizeLong Bytes
Key OnlySTOREAS Bytes_KeyOnlyBytes
Value OnlySTOREAS Bytes_ValueOnlyBytes

Using the “With Sizes” options the Source assumes that the files will contain one or two (depending on configuration) 8-byte chunks of data at the start of the file instructing how many bytes to read for the content.

In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options


Partitioning Options 

When using the S3 Source it is required to specify partitioning options so that the connector understands the layout of the file system.

While the file naming format is required for the connector to read the files in the partition names themselves are not used within the connector.

This will mirror the options defined in the sink.

Please see the S3 Sink documentation for more information on the partitioning scheme.

Configuring read partitions 

The options you define will be in the following formats:

PARTITIONBY fieldA[,fieldB]
PARTITIONBY _key.fieldA[, _key.fieldB]
PARTITIONBY _header.<header_key1>[,_header.<header_key2>]
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
Configuring read partition display 

Limit Options 

In order to limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause.

LIMIT 1000

Input Formats 

csvCSV, CSV_WithHeaders
byesBytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly


Preparing the source system 

The S3 Source Connector requires AWS access keys and a bucket to be set up prior to running.

Configuring access keys 

The S3 connector requires AWS access keys to be able to perform actions in AWS.

You can configure these through AWS console or CLI tools.

Please see AWS documentation for more details

Preparing a target kafka queue 

If you are using lenses you can achieve creation of a Kafka queue simply by using the kafka topic create features of lenses.

Otherwise, change to the directory you have kafka installed

cd ~/Software/kafka_2.12-2.5.0/bin
./kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic=test_topic --partitions=3 --replication-factor=1

Creating a source bucket 

Please ensure you use sensible selections for security and access options. More information on the command at AWS&rsquo;s documentation .

aws  create-bucket --bucket test-kafka-connect-bucket

Populate the source files in S3 

echo '{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}' > 1.json
aws s3 cp 1.json s3://test-kafka-connect-bucket/jsonTest/test_topic/1.json

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select S3 as the source and paste the following:

name=S3SourceConnectorS3 # this can be anything
connect.s3.kcql=insert into test_topic select * from test-kafka-connect-bucket:jsonTest STOREAS `json` LIMIT 5000 

To start the connector without using Lenses, log into the fastdatadev container:

docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create aws-s3-source < connector.properties

Wait a for the connector to start and check its running:

connect-cli status aws-s3-source

Check for data in kafka 

If you are using Lenses, login into Lenses and navigate to the explore page , and select the new topic you added.

View the topic and verify it has data in.

Otherwise, you can use the kafka-console-consumer script that comes with Kafka.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jsonTest --from-beginning

Clean up 

Bring down the stack:

docker-compose down


NameDescriptionTypeAvailable ValuesDefault Value
aws.access.keyAccess Keystring
aws.secret.keySecret Keystring
aws.auth.modeAuth ModestringCredentials, DefaultDefault
aws.custom.endpointCustom Endpointstring
aws.vhost.bucketEnable Vhost Bucketsbooleantrue, falsefalse