Understanding Kafka Connect
This page describes an overview of Kafka Connect.
Last updated
This page describes an overview of Kafka Connect.
Last updated
2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.
Kafka Connect uses Kafka Producer API and Kafka Consumer API to load data into Apache Kafka and output data from Kafka to another storage engine. A Connector is an instantiation of a connector plugin defined by a configuration file.
Kafka Connect is a plugin framework. It exposes APIs for developers to implement standard recipes (known as Connector Plugins) for exchanging data between Kafka and other data systems. It provides the runtime environment to execute the plugins in the form of Connect Clusters made up of Workers.
The fundamental building blocks of Connect Clusters are the Workers. Workers manage the plumbing required to interact with Kafka, which serves to coordinate workload. They also handle the management of connector plugins and the creation and supervision of their instances known as connector instances.
Connect Clusters can be configured either as a standalone cluster (consisting of one Worker) or distributed (consisting of many Workers, ideally on different machines). The benefit of the distributed mode is fault tolerance and load balancing of workload across machines.
A Connector Plugin is a concrete implementation of the Plugin APIs exposed by Connect for a specific third-party system, for example, S3. If a Connector Plugin is extracting data from a system and writing to Kafka it’s called a Source Plugin. If it ships data from Kafka to another system it’s called a Sink Plugin.
The modular architecture of Connect, with Workers managing connector plugins and connector instances, abstracts away the complexities of data integration, enabling developers to concentrate on the core functionality of their applications.
The Community has been developing Connectors to connect to all sorts of systems for years. Here at Lenses, we are the biggest contributor to open-source connectors via our Stream Reactor project.
The Connector Plugins are code, deployed as jar files, and added to the classpath of each Worker in the Connect cluster.
Why does it need to be on each Worker? The reason lies in the distributed nature of Connect and its workload distribution mechanism. The Connect framework evenly distributes tasks among multiple Workers using the Kafka consumer group protocol. This ensures load balancing and fault tolerance within the Connect cluster. Each Worker is responsible for executing a subset of the assigned tasks.
To enable the execution of assigned workloads, each Worker needs to have access to the required plugins locally. By having the plugins available on each Worker's classpath, the Connect framework can dynamically load and utilize them whenever necessary. This approach eliminates the need for plugins to be distributed separately or retrieved from a centralized location during runtime. But it does mean when you install your Connect Cluster that you need to ensure each plugin is also installed.
Each Connector Plugin has to implement two interfaces:
Connector (Source or Sink)
Task (Source or Sink)
The Connector interface is responsible for defining and configuring the instance of the Connector, for example, validating configuration and then splitting that configuration up for the Connect APIs to distribute amongst the workers.
The actual work is done by the Task class. Connect sends out, via Kafka, a configuration, defined by the Connector class to each worker. Each assigned Worker picks up the task (it's listening to the configuration topic), and creates an instance of the task.
Connector instances are created on the Worker you submit the creation request to, task instances can also be on the same Worker but also other Workers. Connect distributes the tasks to Workers via Kafka, they have internal consumer groups listening to the system topics Connect uses. If you look into the default topic, connect-configs, you will see the split of Connector configs, for each task.
A Kafka message is made up of:
Headers
Key
Value
Headers are a map while the key and value are stored as byte arrays in Kafka. Typically these byte arrays represent data stored as JSON or AVRO, however, it could be anything.
To decouple from the format of data inside Kafka topics, Connect uses an internal format called Struct . Structs have fields and fields have types defined in a Schema.
Converters translate the byte arrays which could be AVRO or other formats to Struct for Sink connectors and vice versa for Source connectors.
For example, you use Avro as your data format in your cluster. The Avro converter allows you to build connectors and interact with Connect Struct only, you may decide to move to Protobuf later and don't want to reimplement your connector. This is where the converter comes in. It handles converting the Struct to Avro for Source connectors and Struct to external systems, e.g. Cassandra, for sinks. You can then swap this out for a different converter, you as a developer only deal with Structs.
The following converters are available with Apache Kafka:
org.apache.kafka.connect.converters.DoubleConverter
: Serializes to and deserializes from DOUBLE values. When converting from bytes to Connect format, the converter returns an optional FLOAT64 schema.
org.apache.kafka.connect.converters.FloatConverter
: Serializes to and deserializes from FLOAT values. When converting from bytes to Connect format, the converter returns an optional FLOAT32 schema.
org.apache.kafka.connect.converters.IntegerConverter
: Serializes to and deserializes from INTEGER values. When converting from bytes to Connect format, the converter returns an optional INT32 schema.
org.apache.kafka.connect.converters.LongConverter
: Serializes to and deserializes from LONG values. When converting from bytes to Connect format, the converter returns an optional INT64 schema.
org.apache.kafka.connect.converters.ShortConverter
: Serializes to and deserializes from SHORT values. When converting from bytes to Connect format, the converter returns an optional INT16 schema.
Confluent provides support for (Schema Registry required):
AvroConverter io.confluent.connect.avro.AvroConverter
ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter
JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter
Kafka Connect supports an internal data type of Secret
. If a connector implements this type as part of its Config definition it will be masked in any logging, however, it will still be exposed in API calls to Kafka Connect Workers.
To solve this, Kafka Connect supports secret provider plugins. They allow for indirect references, resolved at the initialization of a Connector instance, to external secret providers.
Lenses.io provides Secret Providers for Azure, AWS, Hashicorp Vault and Environment variables here.
SMTs are plugins, that enable users to manipulate records, one at a time. For Source Connectors they manipulate records after the Task has handed them back to the Connect Framework and before they are written to Kafka. For Sink Connectors, they allow for manipulation of records before they as passed from the Connect Framework to the Sink Task.
Apache Kafka comes with a number of available SMTs, for which the documentation can be found here.
Lenses.io also provides a number of SMTs which can be found here.