# Apache Kafka

A Kafka connection is required for the agent to start. You can connect to Kafka via:

1. Plaintext (no credentials an unencrypted)
2. SSL (no credentials an encrypted)
3. SASL Plaintext and SASL SSL

{% hint style="success" %}
Only one Kafka connection is allowed.

The name must be kafka.

See [JSON schema](https://docs.lenses.io/latest/deployment/configuration/overview#json-schema-support) for support.

Environment variables are supported; escape the dollar sign

```yaml
sslKeystorePassword:
  # In case environment variable has single or double quotes escape it with 
  ## "\${ENV_VAR_NAME}
  value: "${ENV_VAR_NAME}"
```

{% endhint %}

## Plaintext <a href="#protocol-plaintext-with-jmx-metrics" id="protocol-plaintext-with-jmx-metrics"></a>

With PLAINTEXT, there's no encryption and no authentication when connecting to Kafka.

The only required fields are:

* **kafkaBootstrapServers** - a list of bootstrap servers (brokers).\
  It is recommended to add as many brokers (if available) as convenient to this list for fault tolerance.
* **protocol** - depending on the protocol, other fields might be necessary (see examples for other protocols)

In following example JMX metrics for Kafka Brokers are configured too, assuming that all brokers expose their JMX metrics using the same port (9581), without SSL and authentication.

{% code title="provisioning.yaml" %}

```yaml
kafka:
- name: kafka
  version: 1
  tags: [my-tag]
  configuration:
    kafkaBootstrapServers:
      value:
        - PLAINTEXT://your.kafka.broker.0:9092
        - PLAINTEXT://your.kafka.broker.1:9092
    protocol: 
      value: PLAINTEXT
    # all metrics properties are optional
    metricsPort: 
      value: 9581
    metricsType: 
      value: JMX
    metricsSsl: 
      value: false
```

{% endcode %}

## SSL <a href="#protocol-plaintext-with-jmx-metrics" id="protocol-plaintext-with-jmx-metrics"></a>

With SSL the connection to Kafka is encrypted. You can also uses SSL and certificates to authenticate users against Kafka.

A truststore (with password) might need to be set explicitly if the global truststore of the Agent does not include the Certificate Authority (CA) of the brokers.

If **TLS** is used for authentication to the brokers in addition to encryption-in-transit, a key store (with passwords) is required.

```yaml
kafka:
- name: kafka
  version: 1
  tags: [my-tag]
  configuration:
    kafkaBootstrapServers:
      value:
        - SSL://your.kafka.broker.0:9092
        - SSL://your.kafka.broker.1:9092
    protocol: 
      value: SSL
    sslTruststore:
      file: kafka-truststore.jks
    sslTruststorePassword: 
      value: truststorePassword
    sslKeystore:
      file: kafka-keystore.jks
    sslKeyPassword: 
      value: keyPassword
    sslKeystorePassword: 
      value: keystorePassword
```

## SASL Plaintext vs SASL SSL <a href="#sasl_plaintext-vs-sasl_ssl" id="sasl_plaintext-vs-sasl_ssl"></a>

There are 2 SASL-based protocols to access Kafka Brokers: `SASL_SSL` and `SASL_PLAINTEXT`. They both require **SASL mechanism** and **JAAS Configuration** values. What is different is:

1. The transport layer is encyrpted (SSL)
2. The SASL mechanism for authentication (PLAIN, AWS\_MSK\_IAM, GSSAPI).

In addition to this, there might be a keytab file required, depending on the SASL mechanism (for example when using GSSAPI mechanism, most often used for Kerberos).

To use Kerberos authentication, a **Kerberos \_Connection**\_ should be created beforehand.

When encryption-in-transit is used (with **SASL\_SSL**), a trust store might need to be set explicitly if the global trust store of Lenses does not include the CA of the brokers.

## SASL SSL <a href="#protocol-sasl_ssl-sasl-mechanism-plain" id="protocol-sasl_ssl-sasl-mechanism-plain"></a>

### Mechanism PLAIN <a href="#protocol-sasl_ssl-sasl-mechanism-plain" id="protocol-sasl_ssl-sasl-mechanism-plain"></a>

Encrypted communication and basic username and password for authentication.

{% tabs %}
{% tab title="Secrets in Plaintext" %}
{% code title="provisioning.yaml" %}

```yaml
kafka:
- name: kafka
  version: 1
  tags: [my-tag]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_SSL://your.kafka.broker.0:9092
        - SASL_SSL://your.kafka.broker.1:9092
    protocol: 
      value: SASL_SSL
    sslTruststore:
      file: kafka-truststore.jks
    sslTruststorePassword: 
      value: truststorePassword
    sslKeystore:
      file: kafka-keystore.jks
    sslKeyPassword: 
      value: keyPassword
    sslKeystorePassword: 
      value: keystorePassword
    saslMechanism: 
      value: PLAIN
    saslJaasConfig:
      value: |
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="your-username"
        password="your-password";      
```

{% endcode %}
{% endtab %}

{% tab title="Secrets as Environment Vars" %}

<pre class="language-yaml" data-title="provisioning.yaml"><code class="lang-yaml"><strong>kafka:
</strong>- name: kafka
  version: 1
  tags: [my-tag]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_SSL://your.kafka.broker.0:9092
        - SASL_SSL://your.kafka.broker.1:9092
    protocol: 
      value: SASL_SSL
    sslTruststore:
      file: kafka-truststore.jks
    sslTruststorePassword: 
      value: ${SSL_KEYSTORE_PASSWORD}
    sslKeystore:
      file: kafka-keystore.jks
    sslKeyPassword: 
      value: ${SSL_KEYSTORE_PASSWORD}
    sslKeystorePassword: 
      value: ${SSL_KEYSTORE_PASSWORD}
    saslMechanism: 
      value: PLAIN
    saslJaasConfig:
      value: ${SASL_JAAS_CONFIG}
</code></pre>

{% endtab %}
{% endtabs %}

### Mechanism GSSAPI <a href="#protocol-sasl_ssl-sasl-mechanism-gssapi" id="protocol-sasl_ssl-sasl-mechanism-gssapi"></a>

In order to use Kerberos authentication, a **Kerberos Connection** should be created beforehand.

```yaml
kafka:
- name: kafka
  version: 1
  tags: [my-tag]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_SSL://your.kafka.broker.0:9092
        - SASL_SSL://your.kafka.broker.1:9092
    protocol: 
      value: SASL_SSL
    sslTruststore:
      file: kafka-truststore.jks
    sslTruststorePassword: 
      value: truststorePassword
    sslKeystore:
      file: kafka-keystore.jks
    sslKeyPassword: 
      value: keyPassword
    sslKeystorePassword: 
      value: keystorePassword  
    saslMechanism: 
      value: GSSAPI
    saslJaasConfig:
      value: |
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        useTicketCache=false
        serviceName=kafka
        principal="my-principal@DOMAIN.COM";      
    keytab:
      file: /path/to/kafka-keytab.keytab
```

## SASL Plaintext <a href="#protocol-sasl_plaintext-sasl-mechanism-scram-sha-256" id="protocol-sasl_plaintext-sasl-mechanism-scram-sha-256"></a>

No SSL encrypted of communication, credentials communicated to Kafka in clear text.

### Mechanism SCRAM-SHA-256 <a href="#protocol-sasl_plaintext-sasl-mechanism-scram-sha-256" id="protocol-sasl_plaintext-sasl-mechanism-scram-sha-256"></a>

```yaml
kafka:
- name: kafka
  version: 1
  tags: [my-tag]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_PLAINTEXT://your.kafka.broker.0:9092
        - SASL_PLAINTEXT://your.kafka.broker.1:9092
    protocol: 
      value: SASL_PLAINTEXT
    saslMechanism: 
      value: SCRAM-SHA-256
    saslJaasConfig: 
      value: |
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="your-username"
        password="your-password";  
```

### Mechanism SCRAM-SHA-512 <a href="#protocol-sasl_plaintext-sasl-mechanism-scram-sha-256" id="protocol-sasl_plaintext-sasl-mechanism-scram-sha-256"></a>

```yaml
kafka:
- name: kafka
  version: 1
  tags: [my-tag]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_PLAINTEXT://your.kafka.broker.0:9092
        - SASL_PLAINTEXT://your.kafka.broker.1:9092
    protocol: 
      value: SASL_PLAINTEXT
    saslMechanism: 
      value: SCRAM-SHA-512
    saslJaasConfig: 
      value: |
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="your-username"
        password="your-password";      
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/deployment/configuration/agent/automation/kafka/apache-kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
