AWS S3

This page describes the usage of the Stream Reactor AWS S3 Source Connector.

This connector is also available on the AWS Marketplace.

Connector Class

io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector

Example

name=aws-s3SourceConnectorParquet
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.s3.aws.region=eu-west-2
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=Credentials

KCQL Support

The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

INSERT INTO $kafka-topic
SELECT *
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]

Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

INSERT INTO `my-topic-with-hyphen`
SELECT *
FROM bucketAddress:pathPrefix

Source Bucket & Path

The S3 source location is defined within the FROM clause. The connector will read all objects from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.

The FROM clause format is:

FROM [bucketname]:pathprefix
//my-bucket-called-pears:my-folder-called-apples 

If your data in AWS was not written by the Lenses AWS sink set to traverse a folder hierarchy in a bucket and load based on the last modified timestamp of the objects in the bucket. If LastModified sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.

connect.s3.source.partition.extractor.regex=none

connect.s3.source.ordering.type=LastModified

To load in alpha numeric order set the ordering type to AlphaNumeric.

Target Bucket & Path

The target Kafka topic is specified via the INSERT INTO clause. The connector will write all the records to the given topic:

INSERT INTO my-apples-topic SELECT * FROM  my-bucket-called-pears:my-folder-called-apples 

S3 Object formats

The connector supports a range of storage formats, each with its own distinct functionality:

  • JSON: The connector will read objects containing JSON content, each line representing a distinct record.

  • Avro: The connector will read Avro-stored messages from S3 and translate them into Kafka’s native format.

  • Parquet: The connector will read Parquet-stored messages from S3 and translate them into Kafka’s native format.

  • Text: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV: The connector will read objects containing lines of text, each line representing a distinct record.

  • CSV_WithHeaders: The connector will read objects containing lines of text, each line representing a distinct record while skipping the header row.

  • Bytes: The connector will read objects containing bytes, each object is translated to a Kafka message.

Use the STOREAS clause to configure the storage format. The following options are available:

STOREAS `JSON`
STOREAS `Avro`
STOREAS `Parquet`
STOREAS `Text`
STOREAS `CSV`
STOREAS `CSV_WithHeaders`
STOREAS `Bytes`

Text Processing

When using Text storage, the connector provides additional configuration options to finely control how text content is processed.

Regex

In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')

Start-End line

In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')

To trim the start and end lines, set the read.text.trim property to true:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')

Start-End tag

In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:

connect.s3.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')

Storage output matrix

Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

S3 Storage Format
Kafka Output Format
Restore or replicate cluster
Analytics
Sink Converter
Source Converter

JSON

STRING

Same,Other

Yes, No

StringConverter

StringConverter

AVRO,Parquet

STRING

Same,Other

Yes

StringConverter

StringConverter

AVRO,Parquet

STRING

Same,Other

No

ByteArrayConverter

ByteArrayConverter

JSON

JSON

Same,Other

Yes

JsonConverter

StringConverter

JSON

JSON

Same,Other

No

StringConverter

StringConverter

AVRO,Parquet

JSON

Same,Other

Yes,No

JsonConverter

JsonConverter or Avro Converter( Glue, Confluent)

AVRO,Parquet, JSON

BYTES

Same,Other

Yes,No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

AVRO

Same

Yes

Avro Converter( Glue, Confluent)

Avro Converter( Glue, Confluent)

AVRO,Parquet

AVRO

Same

No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

AVRO

Other

Yes,No

Avro Converter( Glue, Confluent)

Avro Converter( Glue, Confluent)

AVRO,Parquet

Protobuf

Same

Yes

Protobuf Converter( Glue, Confluent)

Protobuf Converter( Glue, Confluent)

AVRO,Parquet

Protobuf

Same

No

ByteArrayConverter

ByteArrayConverter

AVRO,Parquet

Protobuf

Other

Yes,No

Protobuf Converter( Glue, Confluent)

Protobuf Converter( Glue, Confluent)

AVRO,Parquet, JSON

Other

Same, Other

Yes,No

ByteArrayConverter

ByteArrayConverter

Projections

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.

Ordering

The S3 sink employs zero-padding in object names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of object.

When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical object key name order.

In such cases, to process object in the correct sequence, the source needs to list all objects in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the objects.

If using LastModified sorting, ensure objects do not arrive late, or use a post-processing step to handle them.

Throttling

To limit the number of object keys the source reads from S3 in a single poll. The default value, if not specified, is 1000:

BATCH = 100

To limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.

LIMIT 10000

Object Extension Filtering

The AWS S3 Source Connector allows you to filter the objects to be processed based on their extensions. This is controlled by two properties: connect.s3.source.extension.excludes and connect.s3.source.extension.includes.

Excluding Object Extensions

The connect.s3.source.extension.excludes property is a comma-separated list of object extensions to exclude from the source object search. If this property is not configured, all objects are considered. For example, to exclude .txt and .csv objects, you would set this property as follows:

connect.s3.source.extension.excludes=txt,csv

Including Object Extensions

The connect.s3.source.extension.includes property is a comma-separated list of object extensions to include in the source object search. If this property is not configured, all objects are considered. For example, to include only .json and .xml objects, you would set this property as follows:

connect.s3.source.extension.includes=json,xml

Note: If both connect.s3.source.extension.excludes and connect.s3.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.

Post-Processing Options

Post-processing options offer flexibility in managing how objects are handled after they have been processed. By configuring these options, users can automate tasks such as deleting objects to save storage space or moving files to an archive for compliance and data retention purposes. These features are crucial for efficient data lifecycle management, particularly in environments where storage considerations or regulatory requirements dictate the need for systematic handling of processed data.

Use Cases for Post-Processing Options

  1. Deleting Objects After Processing

    For scenarios where freeing up storage is critical and reprocessing is not necessary, configure the connector to delete objects after they are processed. This option is particularly useful in environments with limited storage capacity or where processed data is redundantly stored elsewhere.

    Example:

    INSERT INTO `my-topic`
    SELECT * FROM `my-s3-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`DELETE`
    )

    Result: Objects are permanently removed from the S3 bucket after processing, effectively reducing storage usage and preventing reprocessing.

  2. Moving Objects to an Archive Bucket

    To preserve processed objects for archiving or compliance reasons, set the connector to move them to a designated archive bucket. This use case applies to organizations needing data retention strategies or for regulatory adherence by keeping processed records accessible but not in active use.

    Example:

    INSERT INTO `my-topic`
    SELECT * FROM `my-s3-bucket:my-prefix`
    PROPERTIES (
        'post.process.action'=`MOVE`,
        'post.process.action.bucket'=`archive-bucket`,
        'post.process.action.prefix'=`processed/`
    )

    Result: Objects are transferred to an archive-bucket, stored with an updated path that includes the processed/ prefix, maintaining an organized archive structure.

Properties

The PROPERTIES clause is optional and adds a layer of configuration to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

Name
Description
Type
Available Values

read.text.mode

Controls how Text content is read

Enum

Regex, StartEndTag, StartEndLine

read.text.regex

Regular Expression for Text Reading (if applicable)

String

read.text.start.tag

Start Tag for Text Reading (if applicable)

String

read.text.end.tag

End Tag for Text Reading (if applicable)

String

read.text.buffer.size

Text Buffer Size (for optimization)

Int

read.text.start.line

Start Line for Text Reading (if applicable)

String

read.text.end.line

End Line for Text Reading (if applicable)

String

read.text.trim

Trim Text During Reading

Boolean

store.envelope

Messages are stored as “Envelope”

Boolean

post.process.action

Defines the action to perform on source objects after successful processing.

Enum

DELETE or MOVE

post.process.action.bucket

Specifies the target bucket for the MOVE action (required for MOVE).

String

post.process.action.prefix

Specifies a new prefix for the object’s location when using the MOVE action (required for MOVE).

String

post.process.action.retain.dirs

Ensure that paths are retained after a post-process action, using a zero-byte object to represent the path.

Boolean

false

Authentication

The connector offers two distinct authentication modes:

  • Default: This mode relies on the default AWS authentication chain, simplifying the authentication process.

  • Credentials: In this mode, explicit configuration of AWS Access Key and Secret Key is required for authentication.

When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

Here’s an example configuration for the “Credentials” mode:

connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=$AWS_ACCESS_KEY
connect.s3.aws.secret.key=$AWS_SECRET_KEY

For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. This approach ensures robust security practices while handling access credentials.

API Compatible systems

The connector can also be used against API compatible systems provided they implement the following:

listObjectsV2
listObjectsV2Pagbinator
putObject
getObject
headObject
deleteObjects
deleteObject

Option Reference

Name
Description
Type
Available Values
Default Value

connect.s3.aws.auth.mode

Specifies the AWS authentication mode for connecting to S3.

string

"Credentials," "Default"

"Default"

connect.s3.aws.access.key

Access Key for AWS S3 Credentials

string

connect.s3.aws.secret.key

Secret Key for AWS S3 Credentials

string

connect.s3.aws.region

AWS Region for S3 Bucket

string

connect.s3.pool.max.connections

Maximum Connections in the Connection Pool

int

-1 (undefined)

50

connect.s3.custom.endpoint

Custom Endpoint URL for S3 (if applicable)

string

connect.s3.kcql

Kafka Connect Query Language (KCQL) Configuration to control the connector behaviour

string

connect.s3.vhost.bucket

Enable Virtual Hosted-style Buckets for S3

boolean

true, false

false

connect.s3.source.extension.excludes

A comma-separated list of object extensions to exclude from the source object search.

string

[Object extension filtering]({{< relref "#object-extension-filtering" >}})

connect.s3.source.extension.includes

A comma-separated list of object extensions to include in the source object search.

string

[object extension filtering]({{< relref "#object-extension-filtering" >}})

connect.s3.source.partition.extractor.type

Type of Partition Extractor (Hierarchical or Regex)

string

hierarchical, regex

connect.s3.source.partition.extractor.regex

Regex Pattern for Partition Extraction (if applicable)

string

connect.s3.ordering.type

Type of ordering for the S3 object keys to ensure the processing order.

string

AlphaNumeric, LastModified

AlphaNumeric

connect.s3.source.partition.search.continuous

If set to true the connector will continuously search for new partitions.

boolean

true, false

true

connect.s3.source.partition.search.excludes

A comma-separated list of paths to exclude from the partition search.

string

".indexes"

connect.s3.source.partition.search.interval

The interval in milliseconds between searching for new partitions.

long

300000

connect.s3.source.partition.search.recurse.levels

Controls how many levels deep to recurse when searching for new partitions

int

0

connect.s3.source.empty.results.backoff.initial.delay

Initial delay before retrying when no results are found.

long

1000 Milliseconds

connect.s3.source.empty.results.backoff.max.delay

Maximum delay before retrying when no results are found.

long

10000 Milliseconds

connect.s3.source.empty.results.backoff.multiplier

Multiplier to apply to the delay when retrying when no results are found.

double

2.0 Multiplier (x)

connect.s3.source.write.watermark.header

Write the record with kafka headers including details of the source and line number of the file.

boolean

true, false

false

Last updated

Was this helpful?