Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Examples for AWS S3 Source Kafka Connector.
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: aws-s3SourceConnectorParquet (This can be customized as needed)
Connector Class: io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME
: Name of the AWS S3 storage bucket.
$PREFIX_NAME
: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
This configuration example is particularly useful when you need to restore data from a AWS S3, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
Data Restoration with Envelope Structure: If you’re restoring data from AWS S3 into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000
) and a limit on the number of records (LIMIT 10000
). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
Error Handling: Error handling is configured to throw exceptions (connect.s3.error.policy=THROW
) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.s3.source.partition.extractor.type=hierarchical
), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true
), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
Similar to the above, this is another configuration for envelope format
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4
), this variant is configured for single-task processing (tasks.max=1
). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'
), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter
). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081
), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.s3.source.partition.extractor.type=hierarchical
), enabling extraction of partitioning structures from the source data location.
Examples for GCP Source Kafka Connector.
This connector configuration is designed for ingesting data from , into Apache Kafka.
Connector Name: gcp-storageSourceConnectorParquet (This can be customized as needed)
Connector Class: io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
Maximum Tasks: 1 (Number of tasks to execute in parallel)
KCQL Statement:
Inserts data from the specified cloud storage bucket into a Kafka topic.
Syntax: insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS 'parquet'
$TOPIC_NAME
: Name of the Kafka topic where data will be inserted.
$BUCKET_NAME
: Name of the GCP Storage storage bucket.
$PREFIX_NAME
: Prefix or directory within the bucket.
Value Converter:
AvroConverter (Assuming data is serialized in Avro format)
Schema Registry URL:
http://localhost:8089 (URL of the schema registry for Avro serialization)
Authentication Properties:
(These properties depend on the authentication mechanism used for accessing the cloud storage service. Replace placeholders with actual authentication properties for the specific cloud platform.)
This configuration serves as a template and can be customized according to the requirements and specifics of your data.
This configuration example is particularly useful when you need to restore data from a GCP Storage, into Apache Kafka while maintaining all data including headers, key and value for each record. The envelope structure encapsulates the actual data payload along with metadata into files on your source bucket, providing a way to manage and process data with additional context.
Data Restoration with Envelope Structure: If you’re restoring data from GCP Storage into Kafka and want to preserve metadata, this configuration is suitable. Envelopes can include metadata like timestamps, data provenance, or other contextual information, which can be valuable for downstream processing.
Batch Processing: The configuration supports batch processing by specifying a batch size (BATCH=2000
) and a limit on the number of records (LIMIT 10000
). This is beneficial when dealing with large datasets, allowing for efficient processing in chunks.
Error Handling: Error handling is configured to throw exceptions (connect.gcpstorage.error.policy=THROW
) in case of errors during data ingestion, ensuring that any issues are immediately surfaced for resolution.
Hierarchical Partition Extraction: The source partition extractor type is set to hierarchical (connect.gcpstorage.source.partition.extractor.type=hierarchical
), which is suitable for extracting hierarchical partitioning structures from the source data location.
Continuous Partition Search: Continuous partition search is enabled (connect.partition.search.continuous=true
), which helps in continuously searching for new partitions to process, ensuring that newly added data is ingested promptly.
This configuration ensures efficient and reliable data ingestion from cloud storage into Kafka while preserving the envelope structure and providing robust error handling mechanisms.
Similar to the above, this is another configuration for envelope format
Single Task Processing: Unlike the previous configuration which allowed for multiple tasks (tasks.max=4
), this variant is configured for single-task processing (tasks.max=1
). This setup may be preferable in scenarios where the workload is relatively lighter or where processing tasks in parallel is not necessary.
AVRO Format for Data Serialization: Data is serialized in the AVRO format (STOREAS 'AVRO'
), leveraging the AvroConverter (value.converter=io.confluent.connect.avro.AvroConverter
). This is suitable for environments where Avro is the preferred serialization format or where compatibility with Avro-based systems is required.
Schema Registry Configuration: The configuration includes the URL of the schema registry (value.converter.schema.registry.url=http://localhost:8081
), facilitating schema management and compatibility checks for Avro serialized data.
Hierarchical Partition Extraction: Similar to the previous configuration, hierarchical partition extraction is employed (connect.gcpstorage.source.partition.extractor.type=hierarchical
), enabling extraction of partitioning structures from the source data location.
This page contains tutorials for common Kafka Connect use cases.
Examples for AWS S3 Sink Kafka Connector time based partitioning.
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. This provides a default partitioning mechanism for Kafka topics.
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
Examples for GCP Sink Kafka Connector time based partitioning.
This scenario partitions data by date and time, employing record timestamp headers to enable partitioning based on these time components.
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. This provides a default partitioning mechanism for Kafka topics.
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
Extending the previous scenarios, this one partitions data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
This scenario partitions data based on the created at timestamp, utilizing record timestamp headers for partitioning.
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
This scenario partitions data by the data date, utilizing record timestamp headers for partitioning.
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
Coming soon!
Coming soon!
Coming soon!
This page describes how to perform a backup and restore of data in your Kafka cluster.
The following external storage for backing up to and restoring from are:
AWS S3
Backup and restore are achieved using the standard connectors but enabling the Message Envelope to ensure the correct metadata is persisted.
A Kafka message includes keys, values, headers, and metadata (topic, partition, offset, timestamp).
The connector wraps these messages in an "envelope", streamlining backup and restoration without relying on complex Kafka Connect transformations.
Here's how the envelope is structured:
In this format, all parts of the Kafka message are retained. This is beneficial for backup, restoration, and to run analytical queries.
The Source connector uses this format to rebuild the original Kafka message and send it to the specified topic.
Anything can be stored in S3, and the connector does its best to support the major formats, offering support for:
AVRO
Parquet
JSON
CSV (including headers)
Text
BYTES
This format is decoupled from the format in Kafka. The translation from Kafka to Connect happens via the key.converter
and value.converter
connector properties.
Partitioning is crucial for organizing data and improving query speeds. In S3, this is achieved using the Object Key. By default, the connector reflects the structure of the Kafka topic it is sending to. For instance, a three-partition topic would use this configuration:
This would result in:
The connector allows for customised partitioning, which has its perks:
Better performance in subsequent data queries due to organized partitions.
Easy data management through time intervals, like year or month.
Keeping sensitive data in distinct partitions for tighter access controls.
To adjust partitioning, use the PARTITIONBY clause in the KCQL configuration. This can use the Kafka message's key, value, or headers for partitioning.
For instance, for a "sales" Kafka topic with transaction messages, the KCQL can partition data by transaction year, product type, and customer region.
The Kafka Connect S3 Sink Connector will create custom object keys in your S3 bucket that incorporate the customer ID, transaction year, product category, and customer region, resulting in a coarser partitioning strategy. For instance, an object key might look like this:
To achieve more structured object key naming, similar to Athena Hive-like key names where field names are part of the object key, modify the KCQL syntax as follows:
This will result in object keys like:
Organizing data into time-based intervals within custom object keys can be highly beneficial. To achieve time-based intervals with a custom object key naming, the connector supports a complementary Kafka Connect Single Message Transformer (SMT) plugin designed to streamline this process. You can find the transformer plugin and documentation here.
Consider an example where you need the object key to include the wallclock time (the time when the message was processed) and create an hourly window based on a field called `timestamp`. Here's the connector configuration to achieve this:
In this configuration:
The TimestampConverter SMT is used to convert the timestamp field in the Kafka message's value into a string value based on the specified format pattern (yyyy-MM-dd-HH). This allows us to format the timestamp to represent an hourly interval.
The InsertWallclock SMT incorporates the current wallclock time in the specified format (yyyy-MM-dd-HH).
The PARTITIONBY clause leverages the timestamp field and the wallclock header to craft the object key, providing precise control over data partitioning.
This page describes how to use converters with source systems sending JSON and Avro.
Source converters depend on the source system you are reading data from. The Connect SourceTask class requires you to supply a List of SourceRecords. Those records can have a schema but how the schema is translated from the source system to a Connect Struct depends on the connector.
We provide four converters out of the box but you can plug in your own. The WITHCONVERTER
keyword supports this option. These can be used when source systems send records as JSON or AVRO, for example, MQTT or JMS.
Not all Connectors support the source converters. Check the option reference for your connector.
Before records are passed back to connect, they go through the converter if specified.
The payload is an Avro message. In this case, you need to provide a path for the Avro schema file to be able to decode it.
The incoming payload is JSON, the resulting Kafka message value will be of type string and the contents will be the incoming JSON.
The payload is a JSON message. This converter will parse the JSON and create an Avro record for it which will be sent to Kafka.
An experimental converter for translating JSON messages to Avro. The Avro schema is fully compatible as new fields are added as the JSON payload evolves.
The page provides examples of HTTP Sink templating.
In this case the converters are irrelevant as we are not using the message content to populate our message template.
The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.
Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.
The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.
Fields from the AVRO message are substituted into the message body in the following example:
This page describes configuring sink converters for Kafka Connect.
You can configure the converters either at the Connect worker level or at the Connector instance level.
If you follow the best practice while producing the events, each message should carry its schema information. The best option is to send AVRO.
This requires the SchemaRegistry.
Sometimes the producer would find it easier to just send a message with Schema.String
and a JSON string. In this case, your connector configuration should be set to value.converter=org.apache.kafka.connect.json.JsonConverter
. This doesn’t require the SchemaRegistry.
Many existing systems publish JSON over Kafka and bringing them in line with best practices is quite a challenge, hence we added the support. To enable this support you must change the converters in the connector configuration.
This page describes managing a basic connector instance in your Connect cluster.
To deploy a connector into the Kafka Connect Cluster, you must follow the steps below:
You need to have the Jars in your Kafka Connect Cluster plugin.path
Each connector has mandatory configurations that must be deployed and validated; other configurations are optional. Always read the connector documentation first.
You can deploy the connector using Kafka Connect API or Lenses to manage it for you.
Sample of Connector, AWS S3 Sink Connector from Stream Rector:
Let's drill down this connector configuration and what will happen when I deploy:
connector.class
is the plugin we will use.
task.max
is how many tasks will be executed in the Kafka Connect Cluster. In this example, we will have 1 task; my topic has 9 partitions, so in this case, in the consumer group of this connector, we will have 1 instance. This one task will consume 9 partitions. To scale is just to increase the number of tasks.
name
of the connector on the Kafka Connect Cluster must be a unique name for each Kafka Connect Cluster.
topics
the topic name will be consumed, and all data will be written in AWS S3, as our example describes.
value.converter
is the format type used to deserialize or serialize the data in value. In this case, our value will be in json
format.
key.converter
is the format type used to deserialize or serialize the data in key. In this case, our key will be in string
format.
key.converter.schemas.enable
is a field where we tell Kafka Connect if we want to include the value schema in the message. in our example, false
we don't want to include the value schema.
value.converter.schemas.enable
is a field where we tell Kafka Connect if we want to include the key schema in the message. in our example, false
we don't want to include the key schema.
connect.s3.aws.auth.mode
is what is the type of authentication we will use to connect to the AWS S3 bucket.
connect.s3.aws.access.ke
y
is the access key to authenticate into the AWS S3 bucket.
connect.s3.aws.secret.key
is the secret key to authenticate into the AWS S3 bucket.
connect.s3.aws.region
which region the AWS S3 bucket is deployed.
connect.s3.kcql
We use the Kafka Connect Query for configuration, bucket name, folder, which format will be stored, and frequency of adding new files into the bucket.
After deploying your Kafka Connector into your Kafka Connect Cluster, it will be managed by the Kafka Connect Cluster.
To better show how Kafka Connect manages your connectors, we will use Lenses UI.
The image below is the Lenses Connectors list:
In the following image, we will delve into the Kafka Connector details.
Consumer Group
, when we use Lenses, we can see which consumer group is reading and consuming that topic.
Connector tasks
, we can see which tasks are open, the status, and how many records are in and out of that topic.
This page describes how to use Error policies in Stream Reactor sink connectors.
In addition to the dead letter queues provided by Kafka Connect, the Stream Reactor sink connectors support Error Policies to handle failure scenarios.
The sinks have three error policies that determine how failed writes to the target database are handled. These error policies allow you to control the behaviour of the sink if it encounters an error when writing records to the target system. Since Kafka retains the records, subject to the configured retention policy of the topic, the sink can ignore the error, fail the connector or attempt redelivery.
Name | Description | Default Value |
---|
Any error in writing to the target system will be propagated up and processing is stopped. This is the default behaviour.
Any error in writing to the target database is ignored and processing continues.
Keep in mind This can lead to missed errors if you don’t have adequate monitoring. Data is not lost as it’s still in Kafka subject to Kafka’s retention policy. The sink currently does not distinguish between integrity constraint violations and or other exceptions thrown by any drivers or target systems.
Any error in writing to the target system causes the RetryIterable exception to be thrown. This causes the Kafka Connect framework to pause and replay the message. Offsets are not committed. For example, if the table is offline it will cause a write failure, and the message can be replayed. With the Retry policy, the issue can be fixed without stopping the sink.
For deep configuration of AWS S3 Sink connect, click
[connector-prefix].error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are three available options, NOOP, the error is swallowed, THROW, the error is allowed to propagate and retry. For RETRY the Kafka message is redelivered up to a maximum number of times specified by the [connector-prefix].max.retries option | THROW |
[connector-prefix].max.retries | The maximum number of times a message is retried. Only valid when the [connector-prefix].error.policy is set to RETRY | 10 |
[connector-prefix].retry.interval | The interval, in milliseconds between retries, if the sink is using [connector-prefix].error.policy set to RETRY | 60000 |
Coming soon!