Kafka Connect Examples


The easiest way to manage Connections is via Lenses GUI under their respective pages, however it is also possible to do it directly via API, Helm or Lenses CLI. In such case, some connection type-specific values have to be used. Here are few examples of such configuration in YAML format.

  • Find out more about managing Kafka Connect Connections via API
  • Find out more about managing Connections via **Lenses **provision
  • Find out more about installing Lenses via Helm

Simple configuration, with JMX metrics 

The URLs (workers) should always have a scheme defined (http:// or https://).

This example uses an optional AES-256 key. The key decodes values encoded with AES-256 to enable passing encrypted values to connectors. It is only needed if your cluster uses AES-256 Decryption plugin.

your-cluster-name:
  tags: ["tag1"]
  templateName: KafkaConnect
  configurationObject:
    workers:
      - http://my-kc.worker1:8083
      - http://my-kc.worker2:8083
    aes256Key: PasswordPasswordPasswordPassword
    # all metrics properties are optional
    metricsPort: 9581
    metricsType: JMX
    metricsSsl: false

Misc metrics configurations 

Find more about multiple options of configuring services’ metrics (like secured JMX, Jolokia, etc) under Services Metrics

Basic authentication 

For Basic Authentication, define username and password properties.

your-cluster-name:
  tags: ["tag1"]
  templateName: KafkaConnect
  configurationObject:
    workers:
      - http://my-kc.worker1:8083
      - http://my-kc.worker2:8083
    username: my-username
    password: my-password

TLS with custom truststore 

A custom truststore is needed when the Kafka Connect workers are served over TLS (encryption-in-transit) and their certificates are not signed by a trusted CA.

your-cluster-name:
  tags: ["tag1"]
  templateName: KafkaConnect
  configurationObject:
    workers:
      - https://my-kc.worker1:8083
      - https://my-kc.worker2:8083
    sslTruststore:
      fileRef:
        filePath: /path/to/my/truststore.jks
    sslTruststorePassword: myPassword

TLS with client authentication 

A custom truststore might be necessary too (see above).

your-cluster-name:
  tags: ["tag1"]
  templateName: KafkaConnect
  configurationObject:
    workers:
      - https://my-kc.worker1:8083
      - https://my-kc.worker2:8083
    sslKeystore:
      fileRef:
        filePath: /path/to/my/keystore.jks
    sslKeyPassword: keyPassword
    sslKeystorePassword: keystorePassword

TLS with Basic Authentication 

As above - a custom truststore is needed when the Kafka Connect workers are served over TLS (encryption-in-transit) and their certificates are not signed by a trusted CA.

your-cluster-name:
  tags: ["tag1"]
  templateName: KafkaConnect
  configurationObject:
    workers:
      - https://my-kc.worker1:8083
      - https://my-kc.worker2:8083
    sslTruststore:
      fileRef:
        filePath: /path/to/my/truststore.jks
    sslTruststorePassword: myPassword
    username: my-username
    password: my-password

Custom Connector 

If you have a custom connector that you want to enable it for the topology view, or a connector not supported out of the box, you will need to set the lenses.connectors.info configuration entry.

Here is how you can configure connectors in order to appear in the topology graph:

lenses {

  ...

  connectors.info = [
      {
           class.name = "The connector full classpath"
           name = "The name which will be presented in the UI"
           instance = "Details about the instance. Contains the connector configuration field which holds the information. If  a database is involved it would be  the DB connection details, if it is a file it would be the file path, etc"
           sink = true
           extractor.class = "The full classpath for the implementation knowing how to extract the Kafka topics involved. This is only required for a Source"
           icon = "file.png"
           description = "A description for the connector"
           author = "The connector author"
      }
      ...
  ]
}

Source connectors 

Source connectors do not have a standard way to identify target topics in Kafka. The extractor.class option, as seen above, allows Lenses to identify which Kafka topics the connector writes to. When using this extractor it is also expected to provide a property configuration which specifies the field within the connector runtime configuration containing the topics to publish to. Here is an example for a file stream source connector:

class.name = "org.apache.kafka.connect.file.FileStreamSource"
name = "File"
instance = "file"
sink = false
property = "topic"
extractor.class = "io.lenses.config.kafka.connect.SimpleTopicsExtractor"

See option reference.

--
Last modified: September 26, 2024