AWS S3

A Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets.

KCQL support 

The following KCQL is supported:

INSERT INTO bucketAddress:pathPrefix 
SELECT * 
FROM kafka-topic
[PARTITIONBY (partition[, partition] ...)]
[STOREAS storage_format]
[WITHPARTITIONER partitioner]
[WITH_FLUSH_SIZE flush_size]
[WITH_FLUSH_INTERVAL flush_interval]
[WITH_FLUSH_COUNT flush_count]

Examples:

-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA

-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket as AVRO with
-- a maximum of 5000 records per file
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA STOREAS `AVRO` WITH_FLUSH_COUNT = 5000

-- Insert mode, select all fields from "topic-with-headers" and 
-- write underneath "headerpartitioningdemo" prefix on bucket
-- "kafka-connect-aws-s3-test" partitioning files by country 
-- code and facilityNum data in the message header 
INSERT INTO kafka-connect-aws-s3-test:headerpartitioningdemo SELECT * FROM topic-with-headers PARTITIONBY _header.facilityCountryCode, _header.facilityNum STOREAS `TEXT` WITH_FLUSH_COUNT = 100

Concepts 

The connector accepts messages containing AVRO, text, binary byte information from Kafka Connect and can output it to a number of different formats on Amazon S3.

Configuration 

An example configuration is provided:

name=S3SinkConnectorS3 # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000 
aws.region=eu-west-1
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
aws.auth.mode=Credentials

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.

Format configuration 

Format configuration is provided by kcql.

The options for json, avro and parquet will look like the below:

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `JSON`

The options for the formats are case-insensitive, but they are presented here in the most readable form.

JSON Output Format Configuration 

Using JSON as an output format allows you to convert the complex AVRO structure of the message to a simpler schemaless Json format on output.

STOREAS `JSON`
Avro Output Format Configuration 

Using Avro as the output format allows you to output the Avro message.

STOREAS `Avro`
Parquet Output Format Configuration 

Using Parquet as the output format allows you to output the Avro message to a file readable by a parquet reader, including schemas.

STOREAS `Parquet`
Text Output Format Configuration 

If the incoming kafka message contains text only and this is to be pushed through to the S3 sink as is, then this option may be desired.

STOREAS `Text`

It will may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.

value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
CSV Output Format Configuration 

This converts the fields of the Avro message to string values to be written out to a CSV file. There are 2 options for CSV format, to write CSV files with the column headers or without.

STOREAS `CSV_WithHeaders`
STOREAS `CSV`
Byte(Binary) Output Format Configuration 

Bytes can be written from the message key and/or the message value depending on which option is configured.

The key only/ value only options can be useful for, as an example, stitching together multiple parts of a binary file.

The content sizes are output first to an (8-byte) long.

In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Please see the KCQL options available and the results of these configurations:

OptionKCQL ConfigurationRecords written to file as
Key and Value (With Sizes)STOREAS Bytes_KeyAndValueWithSizesLong Long Bytes Bytes
Key (With Size)STOREAS Bytes_KeyWithSizeLong Bytes
Value (With Size)STOREAS Bytes_ValueWithSizeLong Bytes
Key OnlySTOREAS Bytes_KeyOnlyBytes
Value OnlySTOREAS Bytes_ValueOnlyBytes

Flush configuration 

Flush configuration is provided by kcql

FLUSH_COUNT 

The flush occurs after the configured number of records written to the sink.

For example, if you want a file written for every record, you would set the FLUSH_COUNT to 1. If you want a file written for each 10,000 records, then you would set the FLUSH_COUNT to 10000

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 1
FLUSH_SIZE 

The flush occurs after the configured size in bytes is exceeded.

For example, to flush after each 10000 bytes written to a file, you would set the FLUSH_SIZE to 10000.

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_SIZE = 10000
FLUSH_INTERVAL 

The flush occurs after the configured interval.

For example, to roll over to a new file after each 10 minutes, you would set the FLUSH_INTERVAL to 600 (10 minutes * 60 seconds)

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_INTERVAL = 600

Partitioning Options 

There are 2 options for grouping (partitioning) the output files.

Default Partitioning 

S3 Bucket Layout 

In your S3 bucket the sink files will be stored as

bucket/prefix/topic/partition/offset.ext

Where .ext is the appropriate file extension.

(Please note: the prefix must not be divided into subpaths.)

Custom Partitioning 

S3 Bucket Layout 

This allows you to store the sink files in your S3 bucket as

bucket/prefix/customKey1=customValue/topic(partition_offset).ext

or

bucket/prefix/customValue/topic(partition_offset).ext

The custom keys and values can be taken from the kafka message key, from the value record, or the message headers (supporting string-coaxable headers only).

Again, .ext is the appropriate file extension.

(Please note: the prefix must not be divided into subpaths.)

Configuring Custom Partitioning 

The number of partitions you may configure on your sink is unbounded but bear in mind restrictions on AWS S3 key lengths.

Partitions from Message Values 

To pull fields from the message values, just use the name of the field from the Avro message.

The fields from the message must be primitive types (string, int, long, etc) in order to partition by them.

Add this to your KCQL string:

PARTITIONBY fieldA[,fieldB]
Partitions from Message Keys 

It is possible to partition by the entire message key, as long as the key is coercible into a primitive type:

PARTITIONBY _key

Where the Kafka message key is not a primitive but a complex Avro object, it is possible to partition by individual fields within the key.

PARTITIONBY _key.fieldA[, _key.fieldB]
Partitions from Message Headers 

Kafka message headers may be used for partitioning. In this case the header must contain a primitive type easily coercible to a String type.

PARTITIONBY _header.<header_key1>[,_header.<header_key2>]
Mixing Partition Types 

The above partition types can be mixed to configure advanced partitioning.

For example

PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
Configuring partition display 
WITHPARTITIONER=KeysAndValues
WITHPARTITIONER=Values 

Kafka payload support 

Dependent on the format required, this sink supports the following Kafka payloads:

  • AVRO (Structs, Maps, Arrays, Primitives). *
  • Text (including any other primitives and string based formats including Json, XML **)
  • Bytes (including any binary formats)

* Schema required when writing to Avro and Parquet formats.

** However you will not be able to partition by Json or XML properties as Text message content is not parsed.

Output Formats 

formatvariations
json
avro
parquet
text
csvCSV, CSV_WithHeaders
byesBytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly

See connect payloads for more information.

Error polices 

The connector supports Error polices.

Quickstart 

Preparing the target system 

The S3 Sink Connector requires AWS access keys and a bucket to be set up prior to running.

Configuring access keys 

The S3 connector requires AWS access keys to be able to perform actions in AWS.

You can configure these through AWS console or CLI tools.

Please see AWS documentation for more details

Creating a target bucket 

Please ensure you use sensible selections for security and access options. More information on the command at AWS’s documentation.

aws create-bucket –bucket test-kafka-connect-bucket

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select S3 as the sink and paste the following:

    name=S3SinkConnectorS3 # this can be anything
    connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
    topics=$TOPIC_NAME
    tasks.max=1
    connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000 
    aws.region=eu-west-1
    aws.access.key=ACCESS_KEY
    aws.secret.key=SECRET_KEY
    aws.auth.mode=Credentials

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create aws-s3-sink < connector.properties

Wait a for the connector to start and check its running:

connect-cli status aws-s3-sink

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}

Check for data in S3 

Use the aws console. Alternatively on the command line run

aws ls s3://mybucket/mykey --recursive 

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeAvailable ValuesDefault Value
aws.regionRegionstring
aws.access.keyAccess Keystring
aws.secret.keySecret Keystring
aws.auth.modeAuth ModestringCredentials, EC2, ECS, Env
aws.custom.endpointCustom Endpointstring
aws.vhost.bucketEnable Vhost Bucketsbooleantrue, falsefalse