Configuring SQL Processors

Lenses SQL Engine has been designed to allow topics browsing but also stream processing with SQL. The SQL streaming engine supports 3 execution modes:IN_PROC, CONNECT and KUBERNETES. The last two are made available to Enterprise clients and offer fault tolerant and performant streaming applications built via Lenses SQL.

To configure the execution mode update the lenses.sql.execution.mode.

In Process Mode

IN_PROC is the default execution, set the lenses.sql.execution.mode to IN_PROC. This is targeting Development environments or those production setups where stream processing is kept to a minimum.

# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "IN_PROC" // "CONNECT" // "KUBERNETES"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"

Lenses stores the internal state of apps in the above folder. If Lenses restarts, it will pick up the state from the above folder and continue the processing. When running on Kubernetes and a restart occurs on a host where the state directory is not present, it will first rebuild it before it starts processing messages.

Kafka Connect Mode

This mode requires a Kafka Connect cluster 2.1 or higher. It is possible to use a newer version of Connect with an older version of a Kafka cluster. The Lenses SQL processor is added as a plugin (connector) to the Connect cluster.

Best practices we advise for this execution mode is to allocate a specific Connect cluster which is not shared with other connectors (sinks or sources) you might use.

Note

If you want to run the Lenses SQL processors on a KAFKA CONNECT cluster read below section. To execute in KUBERNETES read here

To configure Lenses for CONNECT execution mode:

  1. Edit the lenses.conf file and set the SQL execution mode to CONNECT
  2. Add one or more connect-distributed endpoints for each of your Lenses SQL enabled clusters in the
lenses.kafka.connect.clusters configuration option.

The resulting lenses.conf should look like this:

lenses.kafka.connect.clusters = [
    {
        name: "sql_cluster",
        urls: [
            {
                url:"http://localhost:8083",
                jmx: "localhost:19555"
            }
        ],
        statuses: "connect-statuses",
        configs: "connect-configs",
        offsets: "connect-offsets"
    }
]
....
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "CONNECT"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"

This configuration tells Lenses the processor execution mode is CONNECT but also which Connect cluster is enabled to run Lenses SQL.

Warning

When scaling out with CONNECT, the lenses.sql.state.dir must be created on all workers in any SQL enabled Connect Cluster! This maps internally to the connect.sql.state.store.dir option in the connector.

Installation

The connector is a collection of JAR files that needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The recommended way to add the connector plugin to each Kafka Connect worker is via the isolated classpath loader (plugin.path option).

Note

Please note that the current Lenses SQL 2.3.0 connector, requires a Kafka Connect cluster at version 2.1.

Tip

A supplementary video for the Lenses SQL Connector installation is also available.

The installation of the connector is the same as for any Kafka Connect plugin and typically is the job of the Kafka Connect cluster administrator. The procedure is described below. The path used can be replaced as needed.

  1. Download the connector archive (lenses-sql-connect-vX.Y.Z.tar.gz) from the client area [link] and copy it to each Connect worker’s disk.

  2. Extract the archive under /opt:

    sudo tar xvf lenses-sql-connect-vX.Y.Z.tar.gz -C /opt
    
  3. Edit the Connect worker’s configuration and append to the plugin.path entry the directory of the connector, /opt/lenses-sql-connect-vX.Y.Z. E.g:

    plugin.path=/usr/share/connectors,/opt/lenses-sql-connect-vX.Y.Z
    
  4. Restart the Connect worker.

Note

Kafka Connect scans all directories in plugin.path for connectors. If a connector is a single jar (e.g fatjar) it may be added to any of the plugin path directories. If the connector is a collection of jars (the connector itself and dependencies) they must all be placed inside a subdirectory of the plugin path directories. Hence in our case, the plugin path directory we use is /opt/lenses-sql-connect-vX.Y.Z, whilst all the jar files are under the subdirectory /opt/lenses-sql-connect-vX.Y.Z/connector

Lenses automatically scans the Connect clusters specified in lenses.kafka.connect.clusters and identifies if the Lenses SQL connector is available. Multiple Lenses SQL enabled Connect cluster can be specified. When a Lenses SQL connector is created, the user interface will require to select the target Connect cluster. It is possible to verify the SQL runner is correctly picked in the Lenses Connectors page, as it should also be listed under the available source connectors when visiting the New Connector screen.

Broker Authentication

If the Kafka cluster requires authentication via SASL or SSL, some extra steps are needed for the SQL connector to operate. As these settings are tied to the authentication settings of Lenses, it is best to follow the instructions in the Lenses configuration for broker authentication, which include the settings for the connector.

For SASL/GSSAPI (Kerberos) authentication, the settings are propagated from the Connect worker itself. It is important to provide the JAAS configuration file via the environment instead as a configuration key for Connect. The same applies to the Kerberos configuration file (krb5.conf) if the system default is not used. As an example, to set up Kafka Connect with a JAAS file, this variable export is required:

KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/jaas.conf"

For authentication via SSL certificates there are two requisites. First, the Connect cluster should be setup with SSL. Second the keystore and truststore files need to be available in all the workers, at the same path as they are for the Lenses application itself. As an example, if Lenses is configured like below:

lenses.kafka.settings.consumer.ssl.keystore.location   = /var/private/ssl/client.keystore.jks
lenses.kafka.settings.consumer.ssl.truststore.location = /var/private/ssl/client.truststore.jks
lenses.kafka.settings.producer.ssl.keystore.location   = /var/private/ssl/client.keystore.jks
lenses.kafka.settings.producer.ssl.truststore.location = /var/private/ssl/client.truststore.jks

Then these files should be present in the exact same path for each Connect worker:

/var/private/ssl/client.keystore.jks
/var/private/ssl/client.truststore.jks

Custom Serde

If custom serde is required for the SQL Processors in Connect mode, the serde libraries (jar files) should be added in the same directory as the Lenses SQL connector’s jars.

Kubernetes Mode

To enable execution of Lenses SQL processors on Kubernetes change lenses.sql.execution.mode to KUBERNETES. Additionally, Lenses requires access to a kubectl config file and Kubernetes requires access to Landoops Container Registry.

lenses.sql.execution.mode = "KUBERNETES"

# kubernetes configuration
lenses.kubernetes.config.file = "/home/lenses/.kube/config"
lenses.kubernetes.service.account = "default"
#lenses.kubernetes.processor.image.name = "" # Only needed if you use a custom image
#lenses.kubernetes.processor.image.tag = ""  # Only needed if you use a custom image

Note

If you are deploying Lenses into Kubernetes set the lenses.kubernetes.config.file to an empty string. Lenses will use the token from the pod it is running into to autoconfigure the connectivity to the Kubernetes API server.

The Docker images for the Lenses SQL Runners are hosted in the Landoop container registry. Kubernetes requires an image pull secret to be set up for each namespace you wish to deploy the Lenses SQL Runners too.

Enterprise customers will be provided with credentials to access the registry. For each namespace, you wish to deploy to, the script bin/configure-image-secret can be run to set up the image pull secret:

./configure-image-secret landoop lenses-sql gce-credentials.json username@example.com https://eu.gcr.io default

The options for the script are, in ordinal position.

argument Description
context Kubectl context to use
namespace Namespace to create the secret in
json_key_path
The path to the GCE service
account user credential file
email
The email to use, require for creating
a docker-registry secret in Kubernetes
gcr_registry The google container registry URL
service_account
The Kubernetes service account to patch.
This is optional. The ‘default’ service account is
patched in the namespace if not set

If you are not using the default service account you need to set the correct service account via lenses.kubernetes.service.account configuration entry. This tells Lenses to deploy the pods using this service account.

Broker Authentication

If the Kafka cluster requires authentication via SASL or SSL, some extra steps are needed for the SQL processor to operate. As these settings are tied to the authentication settings of Lenses, it is best to follow the instructions in the Lenses configuration for broker authentication, which include the settings for the processors.

Note

SQL Processors are deployed with SASL/SSL only if lenses.kubernetes.processor.security.protocol is set to SASL_PLAINTEXT, SASL_SSL or SSL.

Lenses can be configured with SSL and SASL settings for the SQL Processors in the main lenses.conf file. Lenses will load all SSL and SASL settings starting with the key lenses.kubernetes.processor. A Kubernetes secret will be created for each namespace labeled with lenses.io/app.type: lenses-secret. This secret will then be used by the processor pods to mount JKS, keytab, krb5.conf and sasl.jaas.configs accordingly and set the environment variables for the processor to use. Below is the minimum requirements, additional SSL and SASL java configurations can be added prefix with the lenses.kubernetes.processor.kafka.settings key.

lenses.kubernetes.processor.kafka.settings.security.protocol        = SSL
lenses.kubernetes.processor.kafka.settings.ssl.truststore.location  = /var/private/ssl/client.truststore.jks
lenses.kubernetes.processor.kafka.settings.ssl.truststore.password  = test1234
lenses.kubernetes.processor.kafka.settings.ssl.keystore.location    = /var/private/ssl/client.keystore.jks
lenses.kubernetes.processor.kafka.settings.ssl.keystore.password    = test1234
lenses.kubernetes.processor.kafka.settings.ssl.key.password         = test1234

# Defines all the keys used for the settings above which contain sensitive information
lenses.kubernetes.processor.kafka.protected.settings=[]

# Defines all the keys used for the settings above which are files and therefore need to be mounted
lenses.kubernetes.processor.kafka.protected.file.settings=[]

Note

Lenses automatically adds ssl.truststore.password, ssl.keystore.password, ssl.key.password to lenses.kubernetes.processor.kafka.protected.settings and ssl.truststore.location and ssl.keystore.location to lenses.kubernetes.processor.kafka.protected.file.settings.

For security.protocols that include SASL you must also provide:

  1. lenses.kubernetes.processor.jaas which is the path to a jaas.conf file for the processors to use.
  2. lenses.kubernetes.processor.kafka.keytab which is the path to the Kerberos keytab if sasl.mechanism is GSSAPI
  3. lenses.kubernetes.processor.krb5 which is the path to the Kerberos krb5 file if sasl.mechanism is GSSAPI

JKS, jaas.conf, keytab and the krb5 files are mounted in the pod at /mnt/secrets/krbjaas.

Important

If you are using GSSAPI your jaas.conf file, the keytab entry must be /mnt/secrets/krbjaas/keytab!

Custom Serde

If custom serdes are required, they should be embedded in a new Lenses SQL processor docker image. The template below may be used for the custom image. To build a custom docker image, create a directory processor-docker and under that a subdirectory named serde.

mkdir -p processor-docker/serde

Once created, copy your serde jar files under processor-docker/serde. Then create the file processor-docker/Dockerfile with contents:

FROM eu.gcr.io/lenses-container-registry/lenses-sql-processor:latest

ADD serde /opt/serde
ENV LENSES_SQL_RUNNERS_SERDE_CLASSPATH_OPTS=/opt/serde

Build the docker.

cd processor-docker
docker build -t example/lsql-processor

Once the image is deployed in your registry, please set Lenses to use it (lenses.conf):

lenses.kubernetes.processor.image.name = "your/image-name"
lenses.kubernetes.processor.image.tag = "your-tag"

Processor Configuration

The connector or Kubernetes processor when not deployed via Lenses requires a minimal set of configurations which are handled for you when submitting requests via Lenses.

Key Description Type Importance
sql.bootstrap.servers Kafka brokers to bootstrap the clients string high
sql.schema.registry.url The url of the schema registry including the protocol .i.e. http string high
sql.state.store.dir Location for KStreams rocksdb directory string high
sql Lenses SQL query to execution in the KStream string high
sql.app.id The Kafka consumer group string medium
sql.metrics.topic The topic to write connector metrics to string medium
sql.metric.frequency Frequency in msec to send state and metrics to the metric topic long medium
sql.enable.metrics Enable state and metrics reporting to Lenses metrics topic boolean medium
sql.status.topic
Status backing topic of the Connect Cluster, has been paused.
The Connect framework does not expose this at runtime
string high
sql.extras
Contains specific
connection settings as a JSON.
These are used mainly for
SSL/Kerberorised clusters (CONNECT MODE ONLY)
string medium

The following Default values are used if not provided

Key Default value
sql.bootstrap.servers localhost:9092
sql.schema.registry.url http://localhost:8081
sql.state.store.dir logs/lenses-kafka-streams-state
sql.lenses.id lenses.connect.id.${UUID.randomUUID()}
sql.metrics.topic _kafka_lenses_metrics
sql.metric.frequency 5000
sql.enable.metrics true
sql.status.topic connect-statuses
sql.extras  

Note

Metrics that are reported by the sql.metrics.topic are direct from the KStream instance running in each task and not the regular consumer and producer JMX.

Deploy via Helm Charts

Helm is a package manager for Kubernetes which allows you to set via configuration the image, the container specs, the application environment, labels, and annotations. Helm can be downloaded from here and relies on kubectl. Helm and kubectl are not part of the Lenses package and are must be installed separately.

For a current list of our existing Helm Charts please visit our repo and are available on our GitHub repo.

The Lenses SQL processor chart, available for Enterprise users, is packaged in the SQL runner release.

To deploy the SQL runner Helm Chart, edit the values.yaml accordingly or set them via the command line.

# Add repos other connector charts
helm repo add landoop https://landoop.github.io/kafka-helm-charts/

# Install with values.yaml in dry run mode
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --dry-run --debug

# Install
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses

# Install and override with different values from a file
helm install -f myvalues.yaml ./helm

# Install and override with different values from command line
helm install install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --set sql.app.id=landoop,brokers.sslEnabled=true

Warning

Lenses will pick up and track deployments created via Helm however if you modify or delete via Lenses, Helm is not aware of these changes. Future releases of Lenses will address this.

Important

The connector and Kubernetes artifacts are only available on an Enterprise license.

Helm Chart Options

Key Description Default
replicaCount The number of runners/pods to deploy 1
image.repository The sql runner image eu.gcr.io/k8-engine/lenses-sql-processor
image.tag The sql runner image tag 2.1
resources.limits.memory   512Mi
resources.requests.memory   256Mi
monitoring.pipeline
An optional label to add deployment and pods
 
monitoring.enabled
Enable monitoring by adding adding
of prometheus scrape annotations
true
monitoring.port
The port metrics are exposed on
9102
monitoring.path
The path metrics are exposed on
/metrics
monitoring.logLevel Log4j debug level INFO
serviceAccount
The Kubernetes service account
to deploy as
default
javaOpts JVM options
-Xms256m -Xmx512m
sql The Lense SQL statement to run  
applicationId The consumer group id to use  
metricsTopicSuffix
The suffix to add to the topic
the processors will report its metrics on
 
metricsFrequency
The frequency at which the runner
will report its metrics in miliseconds
5000
stateStoreDir
The location used by the runner
for the rocks db state store
logs/state-store
port
The port the runner will expose
for status/stop/start and interactive query
8083
kafka
List of brokers
 
schemaRegistries
List of schema registries
 
Bootstrap servers

brokers.bootstrapServers` is a list of bootstrap servers. Multiple brokers are supported.

If you are deploying brokers inside Kubernetes they should be deployed as a statefulset like this. This allows the pods to have stable network identifiers. Each pods address should be added as an entry. The address takes the form of:

<statefulset-name>-<pod ordinal identifier>.<service name>.<namespace>.svc.cluster.local

For example, a statefulset of 3 replicas called broker with a headless service called broker, the addresses would be:

broker-0.broker.defaut.svc.cluster.local
broker-1.broker.defaut.svc.cluster.local

Note

New brokers added or removed on scaling will not be reflected. Currently, Lenses will require a config update. Future releases will address this.

If you only have one broker you can set the service name.

If your brokers are outside Kubernetes add host names.

Key Description Default
kafka.ssl.enabled SSL is enabled on the brokers false
kafka.ssl.truststoreFileData
The base64 encoded contents
of the truststore
 
kafka.ssl.keystoreFileData
The base64 encoded contents
of the keystore
 
kafka.ssl.truststorePassword The truststore password  
kafka.ssl.keystorePassword The keystore password  
kafka.sasl.enabled SASL is enabled on the brokers false
kafka.sasl.keyTabData
The base64 encoded contents
of the keytab file is sasl enabled with GSSAPI
 
kafka.sasl.jaasFileData
The contents of the jaas.conf file
is sasl is enabled
 
kafka.sasl.mechanism
The security.mechanism to use.
GSSAPI, SCRAM or PLAINTEXT
GSSAPI
kafka.sasl.krb5Conf
The contents of the krb5Conf file

if the sasl mechanism is GSSAPI

 
kafka.bootstrapServers.name Host name of the broker  
kafka.bootstrapServers.port The PLAINTEXT default Kafka port 9092
kafka.bootstrapServers.sslPort The SSL Kafka port 9093
kafka.bootstrapServers.saslPort The SASL_SSL Kafka port 9094
kafka.bootstrapServers.saslPlainTextPort The SASL_PLAINTEXT Kafka port 9095

Example:

kafka:
ssl:
  enabled: false
  trustStoreFileData:
  keyStoreFileData:
  trustStorePassword:
  keyStorePassword:
  keyPassword:
sasl:
  enabled: false
  # keyTabData is the base64 encoded contents kerberos keytab file is using kerberos mounted in /mnt/secrets
  keyTabData: |-

  # jaasFileData is the contents of the kafka jaas file mounted in /mnt/secrets
  jaasFileData: |-

  # mechanism is the sasl authentication mechanism GSSAPI, SCRAM or PLAIN
  mechanism: "GSSAPI"
  # krb5Conf is the Kerberos config data to be mounted into /etc
  krb5Conf: |-

bootstrapServers:
  - name: kafka
    port: 9092
    sslPort: 9093
    saslSslPort: 9094
    saslPlainTextPort: 9095
Schema Registries

schemaRegistries is a list of schemaRegistries detailing the hostname, HTTP protocol, and ports. Multiple schema registries are supported.

If you are deploying multiple schema registries for high availability inside Kubernetes they should be deployed as a statefulset. This allows the pods to have stable network identifiers. Each pods address should be added as an entry. The address takes the form of:

<statefulset-name>-<pod ordinal identifier>.<service name>.<namespace>.svc.cluster.local

For example, a statefulset of 2 replicas called schema-registry with a headless service called schema-registry, the addresses would be:

schema-registry-0.schema.defaut.svc.cluster.local
schema-registry-1.schema.defaut.svc.cluster.local

Note

New schema registries added or removed on scaling will not be reflected. Currently, Lenses will require a config update. Future releases will address this.

If you only have one schema registry you can set the service name.

If your schema registries are outside Kubernetes add host names.

Key Description Default
schemaRegistries.enabled Enable schema registry support false
schemaRegistries.hosts.host The host name of the schema registry instance  
schemaRegistries.hosts.protocol The HTTP protocol, http or https http
schemaRegistries.hosts.port The port for the schema registry instance 8081

Example:

schemaRegistries:
  enabled: true
  hosts:
    - host: schema-registry-1
      protocol: http
      port: 8081
    - host: schema-registry-2
      protocol: http
      port: 8081