View the latest documentation 5.5
The Kafka connector is designed to seamlessly ingest records from AWS S3 Buckets into your Kafka cluster. When paired with the S3 Sink Connector, it empowers you to efficiently stream data from S3 to Kafka or facilitate backup and restore operations.
Notably, this connector provides robust support for exactly-once semantics, to the extend that they are supported by Apache Kafka itself. Follow the documentation here to enable exactly once semantics for your Kafka Connect cluster.
To enhance data integrity, we recommend enabling the transactional producer or, at the very least, the idempotent producer.
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
INSERT INTO $kafka-topic SELECT * FROM bucketAddress:pathPrefix [BATCH=batch] [STOREAS storage_format] [LIMIT limit] [PROPERTIES( 'property.1'=x, 'property.2'=x, )]
Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
INSERT INTO `my-topic-with-hyphen` SELECT * FROM bucketAddress:pathPrefix
The target Kafka topic is specified via INSERT INTO clause. The connector will write all the records to the given topic:
INSERT INTO my-topic SELECT * FROM testbucket:pathToReadFrom;
Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.
The S3 source location is defined within the FROM clause. The connector will read all files from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.
The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:
PROPERTIES
To limit the number of file names the source reads from S3 in a single poll. The default value, if not specified, is 1000:
BATCH = 100
In order to limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.
LIMIT 10000
When used in tandem with the S3 Sink Connector, the S3 Source Connector becomes a powerful tool for restoring Kafka topics from S3. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in S3:
{ "key": <the message Key, which can be a primitive or a complex object>, "value": <the message Value, which can be a primitive or a complex object>, "headers": { "header1": "value1", "header2": "value2" }, "metadata": { "offset": 0, "partition": 0, "timestamp": 0, "topic": "topic" } }
When the messages are sent to Kafka, the S3 Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in S3 as Avro, JSON, or Parquet formats.
When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the S3 object key.
To configure the partition extractor, you can utilize the connect.s3.source.partition.extractor.type property, which supports two options:
connect.s3.source.partition.extractor.type
connect.s3.source.partition.extractor.regex
(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$
The connector supports a range of storage formats, each with its own distinct functionality:
Use the STOREAS clause to configure the storage format. The following options are available:
STOREAS
STOREAS `JSON` STOREAS `Avro` STOREAS `Parquet` STOREAS `Text` STOREAS `CSV` STOREAS `CSV_WithHeaders` STOREAS `Bytes`
When using Text storage, the connector provides additional configuration options to finely control how text content is processed.
In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:
connect.s3.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')
In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:
connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')
To trim the start and end lines, set the read.text.trim property to true:
connect.s3.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')
In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in S3 corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:
connect.s3.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')
Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.
Adapt the key.converter and value.converter properties accordingly to the table above.
key.converter
value.converter
The connector offers two distinct authentication modes:
When selecting the “Credentials” mode, it is essential to provide the necessary access key and secret key properties. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
... connect.s3.aws.auth.mode=Credentials connect.s3.aws.region=eu-west-2 connect.s3.aws.access.key=$AWS_ACCESS_KEY connect.s3.aws.secret.key=$AWS_SECRET_KEY ...
For enhanced security and flexibility when using the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers here. This approach ensures robust security practices while handling access credentials.
The S3 sink employs zero-padding in file names to ensure precise ordering, leveraging optimizations offered by the S3 API, guaranteeing the accurate sequence of files.
When using the S3 source alongside the S3 sink, the connector can adopt the same ordering method, ensuring data processing follows the correct chronological order. However, there are scenarios where S3 data is generated by applications that do not maintain lexical file name order.
In such cases, to process files in the correct sequence, the source needs to list all files in the bucket and sort them based on their last modified timestamp. To enable this behavior, set the connect.s3.source.ordering.type to LastModified. This ensures that the source correctly arranges and processes the data based on the timestamps of the files.
connect.s3.source.ordering.type
On this page