# Kafka

{% hint style="success" %}
If deploying with Helm put the **connections** YAML under **provisioning** in the values file.
{% endhint %}

## PLAINTEXT <a href="#protocol-plaintext-with-jmx-metrics" id="protocol-plaintext-with-jmx-metrics"></a>

With PLAINTEXT, there's no encryption and no authentication when connecting to Kafka.

The only required fields are:

* **`kafkaBootstrapServers`** - a list of bootstrap servers (brokers).\
  It is recommended to add as many brokers (if available) as convenient to this list for fault tolerance.
* **`protocol`** - depending on the protocol, other fields might be necessary (see examples for other protocols)

In following example JMX metrics for Kafka Brokers are configured too, assuming that all brokers expose their JMX metrics using the same port (9581), without SSL and authentication.

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configuration:
    kafkaBootstrapServers:
      value:
        - PLAINTEXT://your.kafka.broker.0:9092
        - PLAINTEXT://your.kafka.broker.1:9092
    protocol: 
      value: PLAINTEXT
    # all metrics properties are optional
    metricsPort: 
      value: 9581
    metricsType: 
      value: JMX
    metricsSsl: 
      value: falseSSL 
```

## SSL <a href="#protocol-plaintext-with-jmx-metrics" id="protocol-plaintext-with-jmx-metrics"></a>

With SSL the connection to Kafka is encrypted. You can also uses SSL and certificates to authenticate users against Kafka.

A truststore (with password) might need to be set explicitly if the global truststore of Lenses does not include the Certificate Authority (CA) of the brokers.

If **TLS** is used for authentication to the brokers in addition to encryption-in-transit, a key store (with passwords) is required.

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configuration:
    kafkaBootstrapServers:
      value:
        - SSL://your.kafka.broker.0:9092
        - SSL://your.kafka.broker.1:9092
    protocol: 
      value: SSL
    sslTruststore:
      file: /path/to/truststore.jks
    sslTruststorePassword: 
      value: truststorePassword
    sslKeystore:
      file: /path/to/keystore.jks
    sslKeyPassword: 
      value: keyPassword
    sslKeystorePassword: 
      value: keystorePassword
```

## SASL\_PLAINTEXT vs SASL\_SSL <a href="#sasl_plaintext-vs-sasl_ssl" id="sasl_plaintext-vs-sasl_ssl"></a>

There are 2 SASL-based protocols to access Kafka Brokers: `SASL_SSL` and `SASL_PLAINTEXT`. They both require **SASL mechanism** and **JAAS Configuration** values. What is different is if:

1. The transport layer is encyrpted (SSL)
2. The SASL mechanisn for authentication (PLAIN, AWS\_MSK\_IAM, GSSAPI).

In addition to this, there might be a keytab file required, depending on the SASL mechanism (for example when using GSSAPI mechanism, most often used for Kerberos).

In order to use Kerberos authentication, a **Kerberos \_Connection**\_ should be created beforehand.

Apart from that, when encryption-in-transit is used (with `SASL_SSL`), a trust store might need to be set explicitly if the global trust store of Lenses does not include the CA of the brokers.

Following are a few examples of SASL\_PLAINTEXT and SASL\_SSL

## SASL\_SSL <a href="#protocol-sasl_ssl-sasl-mechanism-plain" id="protocol-sasl_ssl-sasl-mechanism-plain"></a>

### PLAIN <a href="#protocol-sasl_ssl-sasl-mechanism-plain" id="protocol-sasl_ssl-sasl-mechanism-plain"></a>

Encrypted communication and basic username and password for authentication.

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_SSL://your.kafka.broker.0:9092
        - SASL_SSL://your.kafka.broker.1:9092
    protocol: 
      value: SASL_SSL
    sslTruststore:
      file: /path/to/truststore.jks
    sslTruststorePassword: 
      value: truststorePassword
    sslKeystore:
      file: /path/to/keystore.jks
    sslKeyPassword: 
      value: keyPassword
    sslKeystorePassword: 
      value: keystorePassword
    saslMechanism: 
      value: PLAIN
    saslJaasConfig:
      value: |
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="your-username"
        password="your-password";      
```

### AWS\_MSK\_IAM <a href="#protocol-sasl_ssl-sasl-mechanism-aws_msk_iam" id="protocol-sasl_ssl-sasl-mechanism-aws_msk_iam"></a>

When Lenses is running inside AWS and is connecting to an Amazon’s Managed Kafka (MSK) instance, IAM can be used for authentication.

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configuration:
    kafkaBootstrapServers:
      value:
       - SASL_SSL://your.kafka.broker.0:9098
       - SASL_SSL://your.kafka.broker.1:9098
    protocol: SASL_SSL
    saslMechanism: 
      value: AWS_MSK_IAM
    saslJaasConfig:
      value: software.amazon.msk.auth.iam.IAMLoginModule required;
    additionalProperties:
      value:
        sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
```

### GSSAPI <a href="#protocol-sasl_ssl-sasl-mechanism-gssapi" id="protocol-sasl_ssl-sasl-mechanism-gssapi"></a>

In order to use Kerberos authentication, a **Kerberos \_Connection**\_ should be created beforehand.

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_SSL://your.kafka.broker.0:9092
        - SASL_SSL://your.kafka.broker.1:9092
    protocol: 
      value: SASL_SSL
    sslTruststore:
      file: /path/to/truststore.jks
    sslTruststorePassword: 
      value: truststorePassword
    sslKeystore:
      file: /path/to/keystore.jks
    sslKeyPassword: 
      value: keyPassword
    sslKeystorePassword: 
      value: keystorePassword  
    saslMechanism: 
      value: GSSAPI
    saslJaasConfig:
      value: |
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        useTicketCache=false
        serviceName=kafka
        principal="my-principal@DOMAIN.COM";      
     keytab:
       file: /path/to/keytab.jks
```

## SASL\_PLAINTEXT <a href="#protocol-sasl_plaintext-sasl-mechanism-scram-sha-256" id="protocol-sasl_plaintext-sasl-mechanism-scram-sha-256"></a>

No SSL encrypted of communication, credentials communicated to Kafka in clear text.

### SCRAM-SHA-256 <a href="#protocol-sasl_plaintext-sasl-mechanism-scram-sha-256" id="protocol-sasl_plaintext-sasl-mechanism-scram-sha-256"></a>

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_PLAINTEXT://your.kafka.broker.0:9092
        - SASL_PLAINTEXT://your.kafka.broker.1:9092
    protocol: 
      value: SASL_PLAINTEXT
    saslMechanism: 
      value: SCRAM-SHA-256
    saslJaasConfig: 
      value: |
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="your-username"
        password="your-password";      
```

### SCRAM-SHA-512 <a href="#protocol-sasl_plaintext-sasl-mechanism-scram-sha-256" id="protocol-sasl_plaintext-sasl-mechanism-scram-sha-256"></a>

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configuration:
    kafkaBootstrapServers:
      value:
        - SASL_PLAINTEXT://your.kafka.broker.0:9092
        - SASL_PLAINTEXT://your.kafka.broker.1:9092
    protocol: 
      value: SASL_PLAINTEXT
    saslMechanism: 
      value: SCRAM-SHA-256
    saslJaasConfig: 
      value: |
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="your-username"
        password="your-password";    
```

## Advanced Client Configuration <a href="#advanced-consumerproducer-configuration" id="advanced-consumerproducer-configuration"></a>

Lenses interacts with your Kafka Cluster via Kafka Client API. To override the default behavior use `additionalProperties`.

By default there shouldn’t be a need to use additional properties, use it only if really necessary, as a wrong usage might brake the communication with Kafka.

Lenses SQL processors uses the same Kafka connection information provided to Lenses.

```yaml
kafka:
- name: Kafka
  version: 1
  tags: ["optional-tag"]
  configurationObject:
    kafkaBootstrapServers:
      value:
       - PLAINTEXT://your.kafka.broker.0:9092
    protocol: 
      value: PLAINTEXT
    additionalProperties:
      value:
        isolation.level: "read_committed"
        acks: "all"
        ssl.endpoint.identification.algorithm: "https"
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.lenses.io/latest/devx/5.5/deployment/installation/automation/provisioning-examples/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
