You are viewing documentation for an older version of Lenses.io View latest documentation here

Streaming SQL scaling

Lenses provides Kafka Streaming SQL in multiple execution modes

Scaling Kafka SQL Processing 

Kafka table and stream duality

IN_PROC is the default execution mode and the processors are executed locally within Lenses. This can have scalability issues and poses a risk to the running application, and can affect stability. IN_PROC is recommended only for testing.

CONNECT is the execution mode that solves these limitations and provides availability guarantees and scalability. Lenses can deploy your Lenses SQL processors in Kafka Connect. Kafka Connect provides a distributed, fault-tolerant and scalable framework as part of the core Apache Kafka distribution. We advise allocating a Kafka Connect cluster for the purpose of running LSQL stream processing.

KUBERNETES is an execution mode that provides scalability by deploying Lenses SQL runners into Kubernetes clusters. Lenses can deploy and monitor SQL runner deployments created through Lenses or existing tools such as Helm or kubectl.

Kafka Connect 

Kafka Connect provides scalable, fault tolerant, distributed processing by forming a cluster of workers. The cluster provides endpoints to which Lenses will submit the processor configuration. From this point, Kafka Connect will persist configurations and distribute work to the cluster workers. Upon a restart Lenses will recover the status, configuration, and metrics of any Lenses SQL Connectors that are found in the configured clusters, this ensures that if Lenses is offline processing of data in your topologies continues. Lenses will also identify any connectors created outside of Lenses at runtime and start tracking them to provide you visibility.

To scale in or out the number of processor applications we can simply instruct Kafka Connect to decrease or increase the number of tasks across the cluster. The Lenses UI provides simple way to deploy Lenses SQL processors and scale them, simply:

  1. Creating a new processor and selecting the cluster to deploy to
  2. Compose your SQL statement
  3. Set the parallelization .i.e many how tasks/application instances to run
  4. Give the processor a name
  5. Deploy

Lenses will check the validity of the SQL statement and if valid create the Connector instance and start to monitor its behavior.

Lenses supports the following Connector functionality:

  • CREATE - Register and create a new connector
  • PAUSE - Pause the connector and tasks
  • START - Start a paused connector
  • DELETE - Remove a connector

Note: Updating an existing connector is not directly supported. The KStream app can not be updated and the update is more than likely going to break the Schema compatibility of the target insert table but it can be scaled.

Lenses Configuration 

To configure Lenses for CONNECT execution mode:

  1. Edit the lenses.conf file and set the SQL execution mode to CONNECT
  2. Add one or more connect-distributed endpoints for each of your Lenses SQL enabled clusters in the lenses.connect.clusters configuration option.

The resulting lenses.conf should look like this:

lenses.connect.clusters = [{name: "sql-cluster", url: "http://localhost:8083", statuses: "connect-statuses", config: "connect-configs", offsets: "connect-offsets" }]
....
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "CONNECT"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
lenses.sql.monitor.frequency = 5000
lenses.sql.connect.connector.class = "com.landoop.connect.SQL"
lenses.sql.sample.default = 2 // // Sample 2 messages every 200 msec
lenses.sql.sample.window = 200

This configuration tells Lenses that the processor execution mode is CONNECT and where to find the Lenses SQL enabled connect clusters. The connector workers are sending health-check metrics into a metrics.topic every few seconds.

Lenses will scan the Connect cluster specified in the lenses.connect.cluster option for the Lenses SQL connector class and make them available for selection when submitting processors. You can check if the SQL runner is correctly picked with the kafka-connect-cli.

    ~|⇒ connect-cli plugins
    Class name: com.landoop.connect.SQL, Type: source, Version: 0.0.3
    Class name: org.apache.kafka.connect.file.FileStreamSinkConnector, Type: sink, Version: 0.11.0.0-cp1
    Class name: org.apache.kafka.connect.file.FileStreamSourceConnector, Type: source, Version: 0.11.0.0-cp1
    ~|⇒

Warning: When scaling out with CONNECT, the lenses.sql.state.dir must be created on all workers in any SQL enabled Connect Cluster! This maps to the connect.sql.state.store.dir connector option when used with Lenses.

Kafka Connect installation 

The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.

  1. Create a folder called plugins/lib and place the Lenses SQL Connector jar inside
  2. Set the plugin.path in the worker properties file to the location of the jar
  3. Restart the Connect worker.
#  create folder
mkdir -p plugins/lib

# copy in the jar
cp lenses-sql-runners-x.x.x-all.jar plugins/lib

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > config/connect-distributed.properties

# restart the workers
bin/connect-distributed.sh config/connect-distributed.properties

If you are using Kafka versions 0.10.x the plugin.path classloader isolation is not available then set the connector first on the classpath

export CLASSPATH=lenses-sql-runners-x.x.x-all.jar

Connector Configuration 

The connector requires a minimal set of configurations which are handled for you when submitting requests via Lenses.

KeyDescriptionTypeImportance
sql.bootstrap.serversKafka brokers to bootstrap the clientsstringhigh
sql.schema.registry.urlThe url of the schema registry including the protocol i.e.http://stringhigh
sql.state.store.dirLocation for KStreams rocksdb directorystringhigh
sqlSQL query to execution as KStreamsstringmedium
sql.lenses.idA Lenses specific ID to track connectorstringmedium
sql.metrics.topicThe topic to write connector metrics tostringmedium
sql.metric.return.frequencyFrequency in msec to send state and metrics to the metric topiclongmedium
sql.enable.metricsEnable state and metrics reporting to Lenses metrics topicbooleanmedium
sql.status.topicStatus backing topic of the Connect Cluster, has been paused. The Connect framework does not expose this at runtimestringhigh

The following default values are used if not provided

KeyDefault value
sql.bootstrap.serverslocalhost:9092
sql.schema.registry.urlhttp://localhost:8081
sql.state.store.dirlogs/lenses-kafka-streams-state
sql.lenses.idlenses.connect.id.${UUID.randomUUID()}
sql.metrics.topic_kafka_lenses_metrics
sql.metric.frequency5000
sql.enable.metricstrue
sql.status.topicconnect-statuses

Note: Metrics that are reported by the connect.sql.metrics.topic are direct from the KStream instance running in each task and not the regular consumer and producer JMX.

Deployment 

The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.

  1. Create a folder called plugins/libs and place the Lenses SQL Connector jar inside.
  2. Set the plugin.path in the worker properties file to the location of the jar
  3. Restart the Connect worker.
#  create folder
mkdir -p plugins/lib

# copy in the jar
cp lenses-sql-runners-XXX-all.jar plugins/lib

# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > worker.properties

# restart the workers
bin/connect-distributed worker.properties

If you are using Kafka versions 0.10.x and the plugin.path classloader isolation is not available then set the connector first on the classpath

export CLASSPATH=lenses-sql-runners-XXX-all.jar

Kubernetes installation 

Kubernetes, a container orchestration engine, provides the perfect platform to run streaming micro services. It has the ability to ensure a configured number of application instances or pods are running and to scale them up or down accordingly.

In our next release we are going to extend the ExecutionMode to allow deploy Lenses SQL processors into Kubernetes but Enterprise customers can already deploy the Lenses SQL Connector into Kubernetes using our Docker. We recommend using Helm to manage and deploy the connector.

Helm 

Lenses will utilize Helm <https://github.com/kubernetes/helm>__, Helm is a package manager for Kubernetes which allows you to set out in configuration which image you want, the container specs, the application environment and the labelling and annotations in Kubernetes that allow for monitoring.

A list of existing Kafka Helm Charts are available on GitHub. The Lenses Connector SQL processor chart, available for Enterprise users, is packaged in the Connector release.

To deploy the Helm Chart, edit the values.yaml accordingly or set the them at the command line. The values.yaml contains all the options previous described.

# Add repos other connector charts
helm repo add landoop https://datamountaineer.github.io/helm-charts/

# Install with values.yaml in dry run mode
helm install ./helm --dry-run --debug

# Install
helm install ./helm

# Install and override with different values from a file
helm install -f myvalues.yaml ./helm

# Install and override with different values from command line
helm install --set connect.sql.app.id=landoop ./helm

Kafka Connect topics in Kubernetes 

When deploying into Kubernetes the Chart is configured to have backing topics per deployment with the following format:

    connect_{{ .Values.clusterName }}_statuses
    connect_{{ .Values.clusterName }}_offsets
    connect_{{ .Values.clusterName }}_configs

Ensure the following

The status topic used for monitoring the Connector is also set as connect_{{ .Values.clusterName }}_statuses

Ensure that these topics have an infinite retention period, retention.ms=-1 to avoid losing status, configurations or offsets on restart .i.e Kafka has removed the entries.

Note: The connect-configs topic must always ways be a compacted topic