# Creating & managing a connector

## Creating your Kafka Connector Configuration

To deploy a connector into the Kafka Connect Cluster, you must follow the steps below:

1. You need to have the Jars in your Kafka Connect Cluster **plugin.path**
2. Each connector has mandatory configurations that must be deployed and validated; other configurations are optional. Always read the connector documentation first.
3. You can deploy the connector using Kafka Connect API or Lenses to manage it for you.

Sample of Connector, AWS S3 Sink Connector from Stream Rector:

{% code fullWidth="false" %}

```sh
# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=3
name=sink-s3-orders-stream-reactor
topics=orders

# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000
```

{% endcode %}

<figure><img src="https://1499290846-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FETXWnZknWA8VhF6SwqCx%2Fuploads%2FkE8zgwcZupHm2kVXhgfa%2Fimage.png?alt=media&#x26;token=ca000feb-5707-4f21-a78e-b9889dd491df" alt=""><figcaption><p>Simple flow of our connector reading from Kafka and writting into S3</p></figcaption></figure>

Let's drill down this connector configuration and what will happen when I deploy:

```sh
# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=1
name=sink-s3-orders-stream-reactor
topics=orders
```

1. **`connector.class`** is the plugin we will use.
2. **`task.max`** is how many tasks will be executed in the Kafka Connect Cluster. In this example, we will have 1 task; my topic has 9 partitions, so in this case, in the consumer group of this connector, we will have 1 instance. This one task will consume 9 partitions. To scale is just to increase the number of tasks.
3. **`name`** of the connector on the Kafka Connect Cluster must be a unique name for each Kafka Connect Cluster.
4. **`topics`** the topic name will be consumed, and all data will be written in AWS S3, as our example describes.

```sh
# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
```

1. **`value.converter`** is the format type used to deserialize or serialize the data in value. In this case, our value will be in `json` format.
2. **`key.converter`** is the format type used to deserialize or serialize the data in key. In this case, our key will be in `string` format.
3. **`key.converter.schemas.enable`** is a field where we tell Kafka Connect if we want to include the value schema in the message. in our example, `false` we don't want to include the value schema.
4. **`value.converter.schemas.enable`** is a field where we tell Kafka Connect if we want to include the key schema in the message. in our example, `false` we don't want to include the key schema.

```sh
# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000
```

1. **`connect.s3.aws.auth.mode`** is what is the type of authentication we will use to connect to the AWS S3 bucket.
2. **`connect.s3.aws.access.ke`**`y` is the access key to authenticate into the AWS S3 bucket.
3. **`connect.s3.aws.secret.key`** is the secret key to authenticate into the AWS S3 bucket.
4. **`connect.s3.aws.region`** which region the AWS S3 bucket is deployed.
5. **`connect.s3.kcql`** We use the Kafka Connect Query for configuration, bucket name, folder, which format will be stored, and frequency of adding new files into the bucket.

For deep configuration of AWS S3 Sink connect, click [here](https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/aws-s3)

## Managing your Connector

After deploying your Kafka Connector into your Kafka Connect Cluster, it will be managed by the Kafka Connect Cluster.

To better show how Kafka Connect manages your connectors, we will use Lenses UI.

The image below is the Lenses Connectors list:

<figure><img src="https://1499290846-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FETXWnZknWA8VhF6SwqCx%2Fuploads%2FKcFYXvi5s9SjMZpZIg9N%2Fimage.png?alt=media&#x26;token=ae808d02-49f8-4a96-bbdb-9c8add13cc73" alt=""><figcaption></figcaption></figure>

In the following image, we will delve into the Kafka Connector details.

<figure><img src="https://1499290846-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FETXWnZknWA8VhF6SwqCx%2Fuploads%2F5uYKiMS8jpzUphjifo5z%2Fimage.png?alt=media&#x26;token=fdf96ccd-6844-4652-9944-3adff2bd073e" alt=""><figcaption></figcaption></figure>

* `Consumer Group`, when we use Lenses, we can see which consumer group is reading and consuming that topic.
* `Connector tasks`, we can see which tasks are open, the status, and how many records are in and out of that topic.
