4.2

You are viewing documentation for an older version of Lenses.io View latest documentation here

AWS S3

A Kafka Connect Source connector for writing records from AWS S3 Buckets to Kafka.

KCQL support 

The following KCQL is supported:

INSERT INTO kafka-topic 
SELECT * 
FROM bucketAddress:pathPrefix
[PARTITIONBY (partition[, partition] ...)]
[STOREAS storage_format]
[WITHPARTITIONER partitioner]
[LIMIT limit]

Examples:

-- Insert mode, select all fields from the json files
-- within the bucket and path given on testS3Bucket
-- using the default number of records
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToWriteTo

-- Insert mode, select all fields from the avro files
-- within the bucket and path given on testS3Bucket
-- retrieving 5000 records on each poll from Kafka 
-- Connect
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToWriteTo STOREAS `AVRO` LIMIT 5000

-- Insert mode, select the content from the text 
-- files under the bucket (kafka-connect-aws-s3-test)
-- and path (headerpartitioningdemo).  The files as 
-- stored on S3 are partitioned by country 
-- code and facilityNum data
INSERT INTO topicA SELECT * FROM kafka-connect-aws-s3-test:headerpartitioningdemo PARTITIONBY _header.facilityCountryCode, _header.facilityNum STOREAS `TEXT` WITH_FLUSH_COUNT = 100

Concepts 

The connector reads files in AVRO, Parquet, CSV, text, json, or binary/byte information from an S3 bucket into Kafka connect.

Configuration 

An example configuration is provided:

name=S3SourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
aws.secret.key=SECRET_KEY
aws.access.key=ACCESS_KEY
aws.auth.mode=Credentials
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.

Auth Mode configuration 

2 Authentication modes are available:

Credentials 

ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.

aws.auth.mode=Credentials
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
Default 

In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.

aws.auth.mode=Default

The credentials will be discovered through the default chain, in this order:

  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  • Java System Properties - aws.accessKeyId and aws.secretKey
  • Web Identity Token credentials from the environment or container
  • Credential profiles file at the default location (~/.aws/credentials)
  • EC2 Credentials delivered through the Amazon EC2 container service
  • Instance profile credentials delivered through the Amazon EC2 metadata service

The full details of the default chain are available on S3 Documentation

Format configuration 

Format configuration is provided by kcql.

The options for json, avro and parquet will look like the below:

connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `JSON`

The options for the formats are case-insensitive, but they are presented here in the most readable form.

JSON Input Format Configuration 

Using JSON as an input format allows you to read in files containing JSON content (delimited by new lines), line by line.

STOREAS `JSON`

Please note: The JSON is not parsed by the S3 Source connector. There is no difference in handling between Json and Text by the S3 Source connector.

value.converter=org.apache.kafka.connect.storage.StringConverter
Avro Input Format Configuration 

Using Avro as the input format allows you to read the Avro-stored messages on S3 back into Kafka’s native format.

STOREAS `Avro`

It may also be necessary to configure the message converter:

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089 
Parquet Input Format Configuration 

Using Parquet as the input format allows you to read parquet files stored on S3, importing the Avro schemas and values.

STOREAS `Parquet`

It may also be necessary to configure the message converter:

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089 
Text Input Format Configuration 

If the source files on S3 consist of files containing lines of text, then using the text input format may be desired.

STOREAS `Text`

It may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.

value.converter=org.apache.kafka.connect.storage.StringConverter
CSV Input Format Configuration 

This reads CSV files written to S3 into a string to be written back to the Kafka source. There are 2 options for CSV format, if WithHeaders is included then the first row is skipped.

STOREAS `CSV_WithHeaders`
STOREAS `CSV`

Please note there is little distinction between the handling of CSV and handling of TEXT (with the exception that the header row can be skipped). The CSV is not parsed within the connector.

Byte(Binary) Input Format Configuration 

Bytes can be read back in from S3 and back into message keys/values, depending on how the data was written to the source.

This can be used for reading back in a messages containing binary data that were written out using the s3 source, or alternatively reading binary files to be loaded onto a Kafka queue.

Please see the KCQL options available and the results of these configurations:

OptionKCQL ConfigurationRecords read from file as
Key and Value (With Sizes)STOREAS Bytes_KeyAndValueWithSizesLong Long Bytes Bytes
Key (With Size)STOREAS Bytes_KeyWithSizeLong Bytes
Value (With Size)STOREAS Bytes_ValueWithSizeLong Bytes
Key OnlySTOREAS Bytes_KeyOnlyBytes
Value OnlySTOREAS Bytes_ValueOnlyBytes

Using the “With Sizes” options the Source assumes that the files will contain one or two (depending on configuration) 8-byte chunks of data at the start of the file instructing how many bytes to read for the content.

In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Partitioning Options 

When using the S3 Source it is required to specify partitioning options so that the connector understands the layout of the file system.

While the file naming format is required for the connector to read the files in the partition names themselves are not used within the connector.

This will mirror the options defined in the sink.

Please see the S3 Sink documentation for more information on the partitioning scheme.

Configuring read partitions 

The options you define will be in the following formats:

PARTITIONBY fieldA[,fieldB]
PARTITIONBY _key
PARTITIONBY _key.fieldA[, _key.fieldB]
PARTITIONBY _header.<header_key1>[,_header.<header_key2>]
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
Configuring read partition display 
WITHPARTITIONER=KeysAndValues
WITHPARTITIONER=Values 

Limit Options 

In order to limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause.

LIMIT 1000

Input Formats 

formatvariations
json
avro
parquet
text
csvCSV, CSV_WithHeaders
byesBytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly

Quickstart 

Preparing the source system 

The S3 Source Connector requires AWS access keys and a bucket to be set up prior to running.

Configuring access keys 

The S3 connector requires AWS access keys to be able to perform actions in AWS.

You can configure these through AWS console or CLI tools.

Please see AWS documentation for more details

Preparing a target kafka queue 

If you are using lenses you can achieve creation of a Kafka queue simply by using the topic management features of lenses.

Otherwise, change to the directory you have kafka installed

cd ~/Software/kafka_2.12-2.5.0/bin
./kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic=test_topic --partitions=3 --replication-factor=1

Creating a source bucket 

Please ensure you use sensible selections for security and access options. More information on the command at AWS&rsquo;s documentation .

aws  create-bucket --bucket test-kafka-connect-bucket

Populate the source files in S3 

echo '{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}' > 1.json
aws s3 cp 1.json s3://test-kafka-connect-bucket/jsonTest/test_topic/1.json

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select S3 as the source and paste the following:

name=S3SourceConnectorS3 # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into test_topic select * from test-kafka-connect-bucket:jsonTest STOREAS `json` LIMIT 5000 
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
aws.auth.mode=Credentials

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create aws-s3-source < connector.properties

Wait a for the connector to start and check its running:

connect-cli status aws-s3-source

Check for data in kafka 

If you are using Lenses, login into Lenses and navigate to the explore page , and select the new topic you added.

View the topic and verify it has data in.

Otherwise, you can use the kafka-console-consumer script that comes with Kafka.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jsonTest --from-beginning

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeAvailable ValuesDefault Value
aws.access.keyAccess Keystring
aws.secret.keySecret Keystring
aws.auth.modeAuth ModestringCredentials, DefaultDefault
aws.custom.endpointCustom Endpointstring
aws.vhost.bucketEnable Vhost Bucketsbooleantrue, falsefalse