This page contains the release notes for the Stream Reactor.
Improvements to the HTTP Sink
Queue Limiting: We've set a limit on the queue size per topic to reduce the chances of an Out-of-Memory (OOM) issue. Previously the queue was unbounded and in a scenario where the http calls are slow and the sink gets more records than it clears, it would eventually lead to OOM.
Offering Timeout: The offering to the queue now includes a timeout. If there are records to be offered, but the timeout is exceeded, a retriable exception is thrown. Depending on the connector's retry settings, the operation will be attempted again. This helps avoid situations where the sink gets stuck processing a slow or unresponsive batch.
Duplicate Record Handling: To prevent the same records from being added to the queue multiple times, we've introduced a Map[TopicPartition, Offset]
to track the last processed offset for each topic-partition. This ensures that the sink does not attempt to process the same records repeatedly.
Batch Failure Handling: The changes also address a situation where an HTTP call fails due to a specific input, but the batch is not removed from the queue. This could have led to the batch being retried indefinitely, which is now prevented.
HTTP sink ignores null records. Avoids a NPE if custom SMTs send null records to the pipeline
Improvements to the HTTP Sink reporter
Datalake sinks allow an optimisation configuration to avoid rolling the file on schema change. To be used only when the schema changes are backwards compatible
Fixes for the HTTP sink batch.
HTTP sink improvements
HTTP sink improvements
Dependency version upgrades.
Fixed timestamp
of the messages and some of the fields in the payload to correctly use UTC millis.
Changed contentType
in the message to be nullable (Optional String).
Possible exception fix
Store success/failure response codes in http for the reporter.
Fixes possible exception in Reporter code.
Fixes and overall stability improvements.
Remove overriding transport options on GCS client.
Bugfix: HTTP Kafka Reporter ClientID is not unique.
Improvements for handing HTTP sink null values & fix for NPE on GCS.
Removal of shading for Avro and Confluent jars.
Addition of HTTP reporter functionally to route HTTP success and errors to a configurable topic.
Breaking Configuration Change
Configuration for the HTTP Sink changes from Json to standard Kafka Connect properties. Please study the documentation if upgrading to 8.0.0.
Removed support for JSON configuration format. Instead please now configure your HTTP Sink connector using kafka connect properties. HTTP Sink Configuration.
Introduced SSL Configuration. SSL Configuration.
Introduced OAuth Support. OAuth Configuration.
Support for dynamic index names in KCQL. more info
Configurable tombstone behaviour using KCQL property. behavior.on.null.values
more info
SSL Support using standard Kafka Connect properties. more info
Adjust defaultUploadSyncPeriod
to make connector more performant by default.
Fix for issue causing sink to fail with NullPointerException due to invalid offset.
Dependency version upgrades
Fixes a gap in the avro/parquet storage where enums where converted from Connect enums to string.
Adds support for explicit "no partition" specification to kcql, to enable topics to be written in the bucket and prefix without partitioning the data.
Syntax Example: INSERT INTO foo SELECT * FROM bar NOPARTITION
Dependency version upgrades
This release introduces a new configuration option for three Kafka Connect Sink Connectors—S3, Data Lake, and GCP Storage—allowing users to disable exactly-once semantics. By default, exactly once is enabled, but with this update, users can choose to disable it, opting instead for Kafka Connect’s native at-least-once offset management.
S3 Sink Connector: connect.s3.exactly.once.enable
Data Lake Sink Connector: connect.datalake.exactly.once.enable
GCP Storage Sink Connector: connect.gcpstorage.exactly.once.enable
Default Value: true
Indexing is enabled by default to maintain exactly-once semantics. This involves creating an .indexes directory at the root of your storage bucket, with subdirectories dedicated to tracking offsets, ensuring that records are not duplicated.
Users can now disable indexing by setting the relevant property to false
. When disabled, the connector will utilise Kafka Connect’s built-in offset management, which provides at-least-once semantics instead of exactly-once.
Dependency version upgrades
Introduced new properties to allow users to filter the files to be processed based on their extensions.
For AWS S3 Source Connector:
connect.s3.source.extension.excludes
: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
connect.s3.source.extension.includes
: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
For GCP Storage Source Connector:
connect.gcpstorage.source.extension.excludes
: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered.
connect.gcpstorage.source.extension.includes
: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered.
These properties provide more control over the files that the AWS S3 Source Connector and GCP Storage Source Connector process, improving efficiency and flexibility.
Increases the default HTTP Retry timeout from 250ms total timeout to 3 minutes as default. The default consumer group max.poll.timeout
is 5 minutes, so it’s within the boundaries to avoid a group rebalance.
Adding retry delay multiplier as a configurable parameter (with default value) to Google Cloud Storage Connector. Main changes revolve around RetryConfig class and its translation to gax HTTP client config.
Making indexes directory configurable for both source and sink.
Use the below properties to customise the indexes root directory:
connect.datalake.indexes.name
connect.gcpstorage.indexes.name
connect.s3.indexes.name
See connector documentation for more information.
Use the below properties to exclude custom directories from being considered by the source.
connect.datalake.source.partition.search.excludes
connect.gcpstorage.source.partition.search.excludes
connect.s3.source.partition.search.excludes
Azure Service Bus Sink Connector.
Exception / Either Tweaks
Bump java dependencies (assertJ and azure-core)
Add logging on flushing
Fix: Java class java.util.Date support in cloud sink maps
Full Changelog: https://github.com/lensesio/stream-reactor/compare/7.3.2...7.4.0
Support added for writing Date, Time and Timestamp data types to AVRO.
Invalid Protocol Configuration Fix
Azure Service Bus Source Connector Find out more.
GCP Pub/Sub Source Connector Find out more.
To back the topics up the KCQL statement is
When *
is used the envelope setting is ignored.
This change allows for the *
to be taken into account as a default if the given message topic is not found.
Bug fix to ensure that, if specified as part of the template, the Content-Type header is correctly populated.
Update of dependencies.
Upgrading from any version prior to 7.0.0, please see the release and upgrade notes for 7.0.0.
Automated Skip for Archived Objects:
The S3 source now seamlessly bypasses archived objects, including those stored in Glacier and Deep Archive. This enhancement improves efficiency by automatically excluding archived data from processing, avoiding the connector crashing otherwise
Enhanced Key Storage in Envelope Mode:
Changes have been implemented to the stored key when using envelope mode. These modifications lay the groundwork for future functionality, enabling seamless replay of Kafka data stored in data lakes (S3, GCS, Azure Data Lake) from any specified point in time.
Full Changelog: https://github.com/lensesio/stream-reactor/compare/7.1.0...7.2.0
We’ve rolled out enhancements to tackle a common challenge faced by users of the S3 source functionality. Previously, when an external producer abruptly terminated a file without marking the end message, data loss occurred.
To address this, we’ve introduced a new feature: a property entry for KCQL to signal the handling of unterminated messages. Meet the latest addition, read.text.last.end.line.missing. When set to true, this property ensures that in-flight data is still recognized as a message even when EOF is reached but the end line marker is missing.
Upgrading from any version prior to 7.0.0, please see the release and upgrade notes for 7.0.0.
This release brings substantial enhancements to the data-lakes sink connectors, elevating their functionality and flexibility. The focal point of these changes is the adoption of the new KCQL syntax, designed to improve usability and resolve limitations inherent in the previous syntax.
New KCQL Syntax: The data-lakes sink connectors now embrace the new KCQL syntax, offering users enhanced capabilities while addressing previous syntax constraints.
Data Lakes Sink Partition Name: This update ensures accurate preservation of partition names by avoiding the scraping of characters like \
and /
. Consequently, SMTs can provide partition names as expected, leading to reduced configuration overhead and increased conciseness.
Several keywords have been replaced with entries in the PROPERTIES section for improved clarity and consistency:
WITHPARTITIONER: Replaced by PROPERTIES ('partition.include.keys'=true/false)
. When WITHPARTITIONER KeysAndValue
is set to true
, the partition keys are included in the partition path. Otherwise, only the partition values are included.
WITH_FLUSH_SIZE: Replaced by PROPERTIES ('flush.size'=$VALUE)
.
WITH_FLUSH_COUNT: Replaced by PROPERTIES ('flush.count'=$VALUE)
.
WITH_FLUSH_INTERVAL: Replaced by PROPERTIES ('flush.interval'=$VALUE)
.
The adoption of the new KCQL syntax enhances the flexibility of the data-lakes sink connectors, empowering users to tailor configurations more precisely to their requirements. By transitioning keywords to entries in the PROPERTIES section, potential misconfigurations stemming from keyword order discrepancies are mitigated, ensuring configurations are applied as intended.
Please note that the upgrades to the data-lakes sink connectors are not backward compatible with existing configurations. Users are required to update their configurations to align with the new KCQL syntax and PROPERTIES entries. This upgrade is necessary for any instances of the sink connector (S3, Azure, GCP) set up before version 7.0.0.
To upgrade to the new version, users must follow these steps:
Stop the sink connectors.
Update the connector to version 7.0.0.
Edit the sink connectors’ KCQL setting to translate to the new syntax and PROPERTIES entries.
Restart the sink connectors.
This update specifically affects datalake sinks employing the JSON storage format. It serves as a remedy for users who have resorted to a less-than-ideal workaround: employing a Single Message Transform (SMT) to return a Plain Old Java Object (POJO) to the sink. In such cases, instead of utilizing the Connect JsonConverter to seamlessly translate the payload to JSON, reliance is placed solely on Jackson.
However, it’s crucial to note that this adjustment is not indicative of a broader direction for future expansions. This is because relying on such SMT practices does not ensure an agnostic solution for storage formats (such as Avro, Parquet, or JSON).
HTTP Sink Connector (Beta)
Azure Event Hubs Source Connector.
Update of dependencies, CVEs addressed.
Please note that the Elasticsearch 6 connector will be deprecated in the next major release.
GCP Storage Source Connector (Beta) Find out more.
Important: The AWS Source Partition Search properties have changed for consistency of configuration. The properties that have changed for 6.2.0 are:
connect.s3.partition.search.interval
changes to connect.s3.source.partition.search.interval
.
connect.s3.partition.search.continuous
changes to connect.s3.source.partition.search.continuous
.
connect.s3.partition.search.recurse.levels
changes to connect.s3.source.partition.search.recurse.levels
.
If you use any of these properties, when you upgrade to the new version then your source will halt and the log will display an error message prompting you to adjust these properties. Be sure to update these properties in your configuration to enable the new version to run.
Dependency upgrade of Hadoop libraries version to mitigate against CVE-2022-25168.
Jakarta Dependency Migration: Switch to Jakarta EE dependencies in line with industry standards to ensure evolution under the Eclipse Foundation.
There has been some small tidy up of dependencies, restructuring and removal of unused code, and a number of connectors have a slimmer file size without losing any functionality.
The configuration directory has been removed as these examples are not kept up-to-date. Please see the connector documentation instead.
Dependency upgrades.
In this release, all connectors have been updated to address an issue related to conflicting Antlr jars that may arise in specific environments.
Byte Array Support: Resolved an issue where storing the Key/Value as an array of bytes caused compatibility problems due to the connector returning java.nio.ByteBuffer while the Connect framework’s ByteArrayConverter only works with byte[]. This update ensures seamless conversion to byte[] if the key/value is a ByteBuffer.
Fix for NullPointerException: Addressed an issue where the JMS sink connector encountered a NullPointerException when processing a message with a null JMSReplyTo header value.
Fix for DataException: Resolved an issue where the JMS source connector encountered a DataException when processing a message with a JMSReplyTo header set to a queue.
GZIP Support for JSON Writing: Added support for GZIP compression when writing JSON data to AWS S3, GCP Storage, and Azure Datalake sinks.
Improve suppport for handling GCP naming conventions
Removed check preventing nested paths being used in the sink.
Avoid cast exception in GCP Storage connector when using Credentials mode.
Azure Datalake Sink Connector (Beta)
GCP Storage Sink Connector (Beta)
Kudu
Hazelcast
Hbase
Hive
Pulsar
Standardising package names. Connector class names and converters will need to be renamed in configuration.
Some clean up of unused dependencies.
Introducing cloud-common
module to share code between cloud connectors.
Cloud sinks (AWS S3, Azure Data Lake and GCP Storage) now support BigDecimal and handle nullable keys.
Consumer Group Offsets S3 Sink Connector
Enhancement: BigDecimal Support
Bug fix: Redis does not initialise the ErrorHandler
Test Fixes and E2E Test Clean-up: Improved testing with bug fixes and end-to-end test clean-up.
Code Optimization: Removed unused code and converted Java code and tests to Scala for enhanced stability and maintainability.
Ascii Art Loading Fix: Resolved issues related to ASCII art loading.
Build System Updates: Implemented build system updates and improvements.
Stream Reactor Integration: Integrated Kafka-connect-query-language inside of Stream Reactor for enhanced compatibility.
STOREAS Consistency: Ensured consistent handling of backticks with STOREAS.
The source and sink has been the focus of this release.
Full message backup. The S3 sink and source now supports full message backup. This is enabled by adding in the KCQL PROPERTIES('store.envelope'=true)
Removed Bytes_*** storage format. For those users leveraging them there is a migration information below. Storing raw Kafka message the storage format should be AVRO/PARQUET/JSON(less ideal).
Introduced support for BYTES storing single message as raw binary. Typically, storing images or videos are the use case for this. This is enabled by adding in the KCQL STOREAS BYTES
Introduced support for PROPERTIES
to drive new settings required to drive the connectors’ behaviour. The KCQL looks like this: INSERT INTO ... SELECT ... FROM ... PROPERTIES(property=key, ...)
Enhanced PARTITIONBY Support: expanded support for PARTITIONBY fields, now accommodating fields containing dots. For instance, you can use PARTITIONBY a, `field1.field2`
for enhanced partitioning control.
Advanced Padding Strategy: a more advanced padding strategy configuration. By default, padding is now enforced, significantly improving compatibility with S3 Source.
Improved Error Messaging: Enhancements have been made to error messaging, providing clearer guidance, especially in scenarios with misaligned topic configurations (#978).
Commit Logging Refactoring: Refactored and simplified the CommitPolicy for more efficient commit logging (#964).
Comprehensive Testing: Added additional unit testing around configuration settings, removed redundancy from property names, and enhanced KCQL properties parsing to support Map structures.
Consolidated Naming Strategies: Merged naming strategies to reduce code complexity and ensure consistency. This effort ensures that both hierarchical and custom partition modes share similar code paths, addressing issues related to padding and the inclusion of keys and values within the partition name.
Optimized S3 API Calls: Switched from using deleteObjects to deleteObject for S3 API client calls (#957), enhancing performance and efficiency.
JClouds Removal: The update removes the use of JClouds, streamlining the codebase.
Legacy Offset Seek Removal: The release eliminates legacy offset seek operations, simplifying the code and enhancing overall efficiency
Expanded Text Reader Support: new text readers to enhance data processing flexibility, including:
Regex-Driven Text Reader: Allows parsing based on regular expressions.
Multi-Line Text Reader: Handles multi-line data.
Start-End Tag Text Reader: Processes data enclosed by start and end tags, suitable for XML content.
Improved Parallelization: enhancements enable parallelization based on the number of connector tasks and available data partitions, optimizing data handling.
Data Consistency: Resolved data loss and duplication issues when the connector is restarted, ensuring reliable data transfer.
Dynamic Partition Discovery: No more need to restart the connector when new partitions are added; runtime partition discovery streamlines operations.
Efficient Storage Handling: The connector now ignores the .indexes directory, allowing data storage in an S3 bucket without a prefix.
Increased Default Records per Poll: the default limit on the number of records returned per poll was changed from 1024 to 10000, improving data retrieval efficiency and throughput.
Ordered File Processing: Added the ability to process files in date order. This feature is especially useful when S3 files lack lexicographical sorting, and S3 API optimisation cannot be leveraged. Please note that it involves reading and sorting files in memory.
Parquet INT96 Compatibility: The connector now allows Parquet INT96 to be read as a fixed array, preventing runtime failures.
The Kudu and Hive connectors are now deprecated and will be removed in a future release.
Fixed a memory issue with the InfluxDB writer.
Upgraded to Influxdb2 client (note: doesn’t yet support Influxdb2 connections).
For installations that have been using the preview version of the S3 connector and are upgrading to the release, there are a few important considerations:
Previously, default padding was enabled for both “offset” and “partition” values starting in June.
However, in version 5.0, the decision to apply default padding to the “offset” value only, leaving the " partition" value without padding. This change was made to enhance compatibility with querying in Athena.
If you have been using a build from the master branch since June, your connectors might have been configured with a different default padding setting.
To maintain consistency and ensure your existing connector configuration remains valid, you will need to use KCQL configuration properties to customize the padding fields accordingly.
Starting with version 5.0.0, the following configuration keys have been replaced.
In version 4.1, padding options were available but were not enabled by default. At that time, the default padding length, if not specified, was set to 8 characters.
However, starting from version 5.0, padding is now enabled by default, and the default padding length has been increased to 12 characters.
Enabling padding has a notable advantage: it ensures that the files written are fully compatible with the Lenses Stream Reactor S3 Source, enhancing interoperability and data integration.
Sinks created with 4.2.0 and 4.2.1 should retain the padding behaviour, and, therefore should disable padding:
If padding was enabled in 4.1, then the padding length should be specified in the KCQL statement:
STOREAS Bytes_***
is usedThe Bytes_*** storage format has been removed. If you are using this storage format, you will need to install the 5.0.0-deprecated connector and upgrade the connector instances by changing the class name:
Source Before:
Source After:
Sink Before:
Sink After:
The deprecated connector won’t be developed any further and will be removed in a future release. If you want to talk to us about a migration plan, please get in touch with us at sales@lenses.io.
To migrate to the new configuration, please follow the following steps:
stop all running instances of the S3 connector
upgrade the connector to 5.0.0
update the configuration to use the new properties
resume the stopped connectors
All
Ensure connector version is retained by connectors
Lenses branding ASCII art updates
AWS S3 Sink Connector
Improves the error in case the input on BytesFormatWriter is not binary
Support for ByteBuffer which may be presented by Connect as bytes
Note
From Version 4.10, AWS S3 Connectors will use the AWS Client by default. You can revert to the jClouds version by setting the connect.s3.aws.client
property.
All
Scala upgrade to 2.13.10
Dependency upgrades
Upgrade to Kafka 3.3.0
SimpleJsonConverter - Fixes mismatching schema error.
AWS S3 Sink Connector
Add connection pool config
Add Short type support
Support null values
Enabling Compression Codecs for Avro and Parquet
Switch to AWS client by default
Add option to add a padding when writing files, so that files can be restored in order by the source
Enable wildcard syntax to support multiple topics without additional configuration.
AWS S3 Source Connector
Add connection pool config
Retain partitions from filename or regex
Switch to AWS client by default
MQTT Source Connector
Allow toggling the skipping of MQTT Duplicates
MQTT Sink Connector
Functionality to ensure unique MQTT Client ID is used for MQTT sink
Elastic6 & Elastic7 Sink Connectors
Fixing issue with missing null values
All
Scala 2.13 Upgrade
Gradle to SBT Migration
Producing multiple artifacts supporting both Kafka 2.8 and Kafka 3.1.
Upgrade to newer dependencies to reduce CVE count
Switch e2e tests from Java to Scala.
AWS S3 Sink Connector
Optimal seek algorithm
Parquet data size flushing fixes.
Adding date partitioning capability
Adding switch to use official AWS library
Add AWS STS dependency to ensure correct operation when assuming roles with a web identity token.
Provide better debugging in case of exceptions.
FTP Source Connector
Fixes to slice mode support.
Hazelcast Sink Connector
Upgrade to HazelCast 4.2.4. The configuration model has changed and now uses clusters instead of username and password configuration.
Hive Sink Connector
Update of parquet functionality to ensure operation with Parquet 1.12.2.
Support for Hive 3.1.3.
JMS Connector
Enable protobuf support.
Pulsar
Upgrade to Pulsar 2.10 and associated refactor to support new client API.
All
Replace Log4j with Logback to overcome CVE-2021-44228
Bringing code from legacy dependencies inside of project
Cassandra Sink Connector
Ensuring the table name is logged on encountering an InvalidQueryException
HBase Sink Connector
Alleviate possible race condition
All
Move to KCQL 2.8.9
Change sys.errors to ConnectExceptions
Additional testing with TestContainers
Licence scan report and status
AWS S3 Sink Connector
S3 Source Offset Fix
Fix JSON & Text newline detection when running in certain Docker images
Byte handling fixes
Partitioning of nested data
Error handling and retry logic
Handle preCommit with null currentOffsets
Remove bucket validation on startup
Enabled simpler management of default flush values.
Local write mode - build locally, then ship
Deprecating old properties, however rewriting them to the new properties to ensure backwards compatibility.
Adding the capability to specify properties in yaml configuration
Rework exception handling. Refactoring errors to use Either[X,Y] return types where possible instead of throwing exceptions.
Ensuring task can be stopped gracefully if it has not been started yet
ContextReader testing and refactor
Adding a simple state model to the S3Writer to ensure that states and transitions are kept consistent. This can be improved in time.
AWS S3 Source Connector
Change order of match to avoid scala.MatchError
S3 Source rewritten to be more efficient and use the natural ordering of S3 keys
Region is necessary when using the AWS client
Cassandra Sink & Source Connectors
Add connection and read client timeout
FTP Connector
Support for Secure File Transfer Protocol
Hive Sink Connector
Array Support
Kerberos debug flag added
Influx DB Sink
Bump influxdb-java from version 2.9 to 2.29
Added array handling support
MongoDB Sink Connector
Nested Fields Support
Redis Sink Connector
Fix Redis Pubsub Writer
Add support for json and json with schema
Move to connect-common 2.0.5 that adds complex type support to KCQL
AWS S3 Sink Connector
Prevent null pointer exception in converters when maps are presented will null values
Offset reader optimisation to reduce S3 load
Ensuring that commit only occurs after the preconfigured time interval when using WITH_FLUSH_INTERVAL
AWS S3 Source Connector (New Connector)
Cassandra Source Connector
Add Bucket Timeseries Mode
Reduction of logging noise
Proper handling of uninitialized connections on task stop()
Elasticsearch Sink Connector
Update default port
Hive Sink
Improve Orc format handling
Fixing issues with partitioning by non-string keys
Hive Source
Ensuring newly written files can be read by the hive connector by introduction of a refresh frequency configuration option.
Redis Sink
Correct Redis writer initialisation
AWS S3 Sink Connector
Elasticsearch 7 Support
Hive Source
Rename option connect.hive.hive.metastore
to connect.hive.metastore
Rename option connect.hive.hive.metastore.uris
to connect.hive.metastore.uris
Fix Elastic start up NPE
Fix to correct batch size extraction from KCQL on Pulsar
Move to Scala 2.12
Move to Kafka 2.4.1 and Confluent 5.4
Deprecated:
Druid Sink (not scala 2.12 compatible)
Elastic Sink (not scala 2.12 compatible)
Elastic5 Sink(not scala 2.12 compatible)
Redis
Add support for Redis Streams
Cassandra
Add support for setting the LoadBalancer policy on the Cassandra Sink
ReThinkDB
Use SSL connection on Rethink initialize tables is ssl set
FTP Source
Respect connect.ftp.max.poll.records when reading slices
MQTT Source
Allow lookup of avro schema files with wildcard subscriptions
Field | Old Property | New Property |
---|---|---|
AWS Secret Key
aws.secret.key
connect.s3.aws.secret.key
Access Key
aws.access.key
connect.s3.aws.access.key
Auth Mode
aws.auth.mode
connect.s3.aws.auth.mode
Custom Endpoint
aws.custom.endpoint
connect.s3.custom.endpoint
VHost Bucket
aws.vhost.bucket
connect.s3.vhost.bucket