4.3

SQL on Connect

SQL Connect Mode 

SQL Processors can run as a Kafka Connect plugin (connector). The SQL Processor connector requires Kafka Connect 2.5 * (Confluent 5.5).

Within Lenses configuration (lenses.conf) the SQL execution mode in Lenses should be set to CONNECT. Also a SQL state directory must be provided. This will be local to the Connect workers, ensure that each worker has write access to it.

lenses.sql.execution.mode = CONNECT
# This directory will be used in the Connect workers' side.
# They must have write access.
lenses.sql.state.dir = "/var/run/lenses-sql-kstream-state"

An AES-256 key must be set for the Connect Clusters in lenses.conf. They key length must be 32 bytes, which corresponds to 32 ASCII characters. The Secret Provider plugin must be added to the Connect cluster .

lenses.kafka.connect.clusters = [
 {
   name: "SQL-Connect-Cluster",
   urls: [
     { url:"http://CONNECT_HOST_1:8083" },
     { url:"http://CONNECT_HOST_2:8083" }
   ],
   statuses: "connect-status",
   configs : "connect-configs",
   offsets : "connect-offsets",
   aes256.key: "0123456789abcdef0123456789abcdef"
 }
]

A dedicated Connect Cluster is recommended for SQL processors for enhanced reliability.

Install Lenses SQL Connector 

Each Kafka Connect worker must have the Lenses SQL Connect plugin installed. Connector installation is the same as any other Kafka Connect plugin and typically is the Connect cluster administrator’s job.

Plugins are added under the plugin.path of each worker. Each plugin should be in its own subdirectory.

As an example, the Connect worker’s configuration may include the line below, which sets the plugin.path to /usr/share/connectors.

plugin.path=/usr/share/connectors

To install the SQL connector, download the connector archive (lenses-sql-connect-vX.Y.Z.tar.gz) from the client area , extract it, and copy all files under the connector directory to /usr/share/connectors/lenses-sql-streaming. Then restart the Connect worker.

mkdir -p /usr/share/connectors/lenses-sql-streaming
tar -xzf lenses-sql-connect.tar.gz \
    -C /usr/share/connectors/lenses-sql-streaming \
    --wildcards */connector/* --strip-components=2

If the Kafka cluster or Schema Registry require authentication, the Secret Provider plugin must also be installed.

UDF and Serde 

When using User Defined Functions (UDF) or custom de/serializers (SERDE), copy the UDF and SERDE jar files in the SQL connector’s directory.

cp /path/to/udf-and-serde-jars/* /usr/share/connectors/lenses-sql-streaming

Kerberos 

Working with Kerberos requires a Kerberos configuration file (krb5.conf) which needs to be provided at runtime, a limitation imposed by the Java Virtual Machine (JVM).

If the Connect Workers are already authenticating to the Brokers with Kerberos (SASL/GSSAPI), or the Connect Worker machine is set with a valid Kerberos configuration at /etc/krb5.conf no action is required.

When this is not the case, the Connect cluster administrator should provide a valid krb5.conf via one of the following methods:

  1. Add the krb5.conf at the default location /etc/krb5.conf.
  2. Add the krb5.conf at a custom location and use the KAFKA_OPTS environment variable to point the Connect workers to its location.
    KAFKA_OPTS="-Djava.security.krb5.conf=/path/to/krb5.conf"
    

Install Secret Provider Connect 

Authentication to Kafka Brokers and Schema Registry requires credentials in the form of passwords (e.g private key password) and files (e.g keytab file). Lenses will not transfer sensitive information unencrypted. In order to work with secured setups, installation and configuration of Lenses.io’s Secret Provider plugin (also see documentation ) is required. Lenses supports the Aes256DecodingProvider of the plugin.

The plugin’s encryption is symmetric —the same AES-256 key must be set on both Lenses and Connect workers.

The Secret Provider plugin must be downloaded and added under its own subdirectory to the plugin.path of each Connect worker the same way as the Lenses SQL plugin.

wget https://github.com/lensesio/secret-provider/releases/download/2.1.6/secret-provider-2.1.6-all.jar \
     -O /usr/share/connectors/secret-provider-2.1.6-all.jar

Each Connect worker should be configured with the AES-256 set in Lenses, a 32-byte string which usually is a 32 ASCII character string.

config.providers=aes256
config.providers.aes256.class=io.lenses.connect.secrets.providers.Aes256DecodingProvider
config.providers.aes256.param.aes256.key=0123456789abcdef0123456789abcdef
config.providers.aes256.param.file.dir=/var/run/connect

More information can be found at AES256 Secret Provider documentation .

The AES-256 key must also be set at lenses.conf for each Connect cluster used for SQL Processors.

lenses.kafka.connect.clusters = [
 {
   name: "SQL-Connect-Cluster",
   urls: [
     { url:"http://CONNECT_HOST_1:8083" },
     { url:"http://CONNECT_HOST_2:8083" }
   ],
   statuses: "connect-status",
   configs : "connect-configs",
   offsets : "connect-offsets",
   aes256.key: "0123456789abcdef0123456789abcdef"
 }
]

Troubleshooting 

Cluster Name list empty for Processor 

If, when creating a SQL processor, there aren’t any clusters available to choose from please verify:

Kafka Connect supported versions 

Lenses SQL Connector 4.3.x supports Kafka 2.5 and 2.6.

It may support higher versions (e.g 2.7+) if they are build for Scala 2.13 — which is the default for Kafka since 2.6 but other builds are available for download from Apache Kafka’s website. Older versions (i.e 2.3, 2.4) may be supported if they are at their latest patch releases, due to a breaking change to jackson’s library version between patches.

In any case, versions other than Kafka 2.5 and 2.6 are eligible for best-effort support only.

Lenses SQL Connector 4.3.x is built for both Scala 2.12 and Scala 2.13, and it is the users’ responsibility to install the correct Connector plugin for their Connect cluster. Also, be aware that Lenses 4.3.x is the last version of Lenses for which the SQL Connector will be compiled for Scala 2.12; future releases will be compiled for Scala 2.13 only, unless stated otherwise.