Creating & managing a connector

This page describes managing a basic connector instance in your Connect cluster.

Creating your Kafka Connector Configuration

To deploy a connector into the Kafka Connect Cluster, you must follow the steps below:

  1. You need to have the Jars in your Kafka Connect Cluster plugin.path

  2. Each connector has mandatory configurations that must be deployed and validated; other configurations are optional. Always read the connector documentation first.

  3. You can deploy the connector using Kafka Connect API or Lenses to manage it for you.

Sample of Connector, AWS S3 Sink Connector from Stream Rector:

# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=3
name=sink-s3-orders-stream-reactor
topics=orders

# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000

Let's drill down this connector configuration and what will happen when I deploy:

# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=1
name=sink-s3-orders-stream-reactor
topics=orders
  1. connector.class is the plugin we will use.

  2. task.max is how many tasks will be executed in the Kafka Connect Cluster. In this example, we will have 1 task; my topic has 9 partitions, so in this case, in the consumer group of this connector, we will have 1 instance. This one task will consume 9 partitions. To scale is just to increase the number of tasks.

  3. name of the connector on the Kafka Connect Cluster must be a unique name for each Kafka Connect Cluster.

  4. topics the topic name will be consumed, and all data will be written in AWS S3, as our example describes.

# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
  1. value.converter is the format type used to deserialize or serialize the data in value. In this case, our value will be in json format.

  2. key.converter is the format type used to deserialize or serialize the data in key. In this case, our key will be in string format.

  3. key.converter.schemas.enable is a field where we tell Kafka Connect if we want to include the value schema in the message. in our example, false we don't want to include the value schema.

  4. value.converter.schemas.enable is a field where we tell Kafka Connect if we want to include the key schema in the message. in our example, false we don't want to include the key schema.

# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000
  1. connect.s3.aws.auth.mode is what is the type of authentication we will use to connect to the AWS S3 bucket.

  2. connect.s3.aws.access.key is the access key to authenticate into the AWS S3 bucket.

  3. connect.s3.aws.secret.key is the secret key to authenticate into the AWS S3 bucket.

  4. connect.s3.aws.region which region the AWS S3 bucket is deployed.

  5. connect.s3.kcql We use the Kafka Connect Query for configuration, bucket name, folder, which format will be stored, and frequency of adding new files into the bucket.

For deep configuration of AWS S3 Sink connect, click here

Managing your Connector

After deploying your Kafka Connector into your Kafka Connect Cluster, it will be managed by the Kafka Connect Cluster.

To better show how Kafka Connect manages your connectors, we will use Lenses UI.

The image below is the Lenses Connectors list:

In the following image, we will delve into the Kafka Connector details.

  • Consumer Group, when we use Lenses, we can see which consumer group is reading and consuming that topic.

  • Connector tasks, we can see which tasks are open, the status, and how many records are in and out of that topic.

Last updated

Logo

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.