1.0
Streaming SQL scaling
Lenses provides Kafka Streaming SQL in multiple execution modes
Scaling Kafka SQL Processing
IN_PROC
is the default execution mode and the processors are executed locally within Lenses. This can have
scalability issues and poses a risk to the running application, and can affect stability.
IN_PROC
is recommended only for testing.
CONNECT
is the execution mode that solves these limitations and provides availability guarantees and scalability.
Lenses can deploy your Lenses SQL processors in Kafka Connect. Kafka Connect provides a distributed, fault-tolerant and scalable
framework as part of the core Apache Kafka distribution. We advise allocating a Kafka Connect cluster for the purpose of running LSQL stream processing.
KUBERNETES
is an execution mode that provides scalability by deploying Lenses SQL runners into Kubernetes clusters.
Lenses can deploy and monitor SQL runner deployments created through Lenses or existing tools such as Helm or kubectl.
Kafka Connect
Kafka Connect provides scalable, fault tolerant, distributed processing by forming a cluster of workers. The cluster provides endpoints to which Lenses will submit the processor configuration. From this point, Kafka Connect will persist configurations and distribute work to the cluster workers. Upon a restart Lenses will recover the status, configuration, and metrics of any Lenses SQL Connectors that are found in the configured clusters, this ensures that if Lenses is offline processing of data in your topologies continues. Lenses will also identify any connectors created outside of Lenses at runtime and start tracking them to provide you visibility.
To scale in or out the number of processor applications we can simply instruct Kafka Connect to decrease or increase the number of tasks across the cluster. The Lenses UI provides simple way to deploy Lenses SQL processors and scale them, simply:
- Creating a new processor and selecting the cluster to deploy to
- Compose your SQL statement
- Set the parallelization .i.e many how tasks/application instances to run
- Give the processor a name
- Deploy
Lenses will check the validity of the SQL statement and if valid create the Connector instance and start to monitor its behavior.
Lenses supports the following Connector functionality:
CREATE
- Register and create a new connectorPAUSE
- Pause the connector and tasksSTART
- Start a paused connectorDELETE
- Remove a connector
Note: Updating an existing connector is not directly supported. The KStream app can not be updated and the update is more than likely going to break the Schema compatibility of the target insert table but it can be scaled.
Lenses Configuration
To configure Lenses for CONNECT
execution mode:
- Edit the
lenses.conf
file and set the SQL execution mode toCONNECT
- Add one or more connect-distributed endpoints for each of your Lenses SQL enabled clusters in the
lenses.connect.clusters
configuration option.
The resulting lenses.conf
should look like this:
lenses.connect.clusters = [{name: "sql-cluster", url: "http://localhost:8083", statuses: "connect-statuses", config: "connect-configs", offsets: "connect-offsets" }]
....
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "CONNECT"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
lenses.sql.monitor.frequency = 5000
lenses.sql.connect.connector.class = "com.landoop.connect.SQL"
lenses.sql.sample.default = 2 // // Sample 2 messages every 200 msec
lenses.sql.sample.window = 200
This configuration tells Lenses that the processor
execution mode is CONNECT
and where to find the Lenses SQL
enabled connect clusters. The connector workers are sending health-check metrics into a metrics.topic
every few
seconds.
Lenses will scan the Connect cluster specified in the lenses.connect.cluster
option for the Lenses SQL connector class and make them
available for selection when submitting processors. You can check if the SQL runner is correctly picked
with the kafka-connect-cli
.
~|⇒ connect-cli plugins Class name: com.landoop.connect.SQL, Type: source, Version: 0.0.3 Class name: org.apache.kafka.connect.file.FileStreamSinkConnector, Type: sink, Version: 0.11.0.0-cp1 Class name: org.apache.kafka.connect.file.FileStreamSourceConnector, Type: source, Version: 0.11.0.0-cp1 ~|⇒
Warning: When scaling out with CONNECT
, the lenses.sql.state.dir
must be created on all workers in any SQL enabled Connect Cluster!
This maps to the connect.sql.state.store.dir
connector option when used with Lenses.
Kafka Connect installation
The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.
- Create a folder called
plugins/lib
and place the Lenses SQL Connector jar inside - Set the
plugin.path
in the worker properties file to the location of the jar - Restart the Connect worker.
# create folder
mkdir -p plugins/lib
# copy in the jar
cp lenses-sql-runners-x.x.x-all.jar plugins/lib
# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > config/connect-distributed.properties
# restart the workers
bin/connect-distributed.sh config/connect-distributed.properties
If you are using Kafka versions 0.10.x the plugin.path
classloader isolation is not available then
set the connector first on the classpath
export CLASSPATH=lenses-sql-runners-x.x.x-all.jar
Connector Configuration
The connector requires a minimal set of configurations which are handled for you when submitting requests via Lenses.
Key | Description | Type | Importance |
---|---|---|---|
sql.bootstrap.servers | Kafka brokers to bootstrap the clients | string | high |
sql.schema.registry.url | The url of the schema registry including the protocol i.e.http:// | string | high |
sql.state.store.dir | Location for KStreams rocksdb directory | string | high |
sql | SQL query to execution as KStreams | string | medium |
sql.lenses.id | A Lenses specific ID to track connector | string | medium |
sql.metrics.topic | The topic to write connector metrics to | string | medium |
sql.metric.return.frequency | Frequency in msec to send state and metrics to the metric topic | long | medium |
sql.enable.metrics | Enable state and metrics reporting to Lenses metrics topic | boolean | medium |
sql.status.topic | Status backing topic of the Connect Cluster, has been paused. The Connect framework does not expose this at runtime | string | high |
The following default values are used if not provided
Key | Default value |
---|---|
sql.bootstrap.servers | localhost:9092 |
sql.schema.registry.url | http://localhost:8081 |
sql.state.store.dir | logs/lenses-kafka-streams-state |
sql.lenses.id | lenses.connect.id.${UUID.randomUUID()} |
sql.metrics.topic | _kafka_lenses_metrics |
sql.metric.frequency | 5000 |
sql.enable.metrics | true |
sql.status.topic | connect-statuses |
Note: Metrics that are reported by the connect.sql.metrics.topic
are direct
from the KStream instance running in each task and not the regular consumer and producer JMX.
Deployment
The connector needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to archive this is via the isolated classpath loader introduced into Connect in Kafka version 0.11.
- Create a folder called
plugins/libs
and place the Lenses SQL Connector jar inside. - Set the
plugin.path
in the worker properties file to the location of the jar - Restart the Connect worker.
# create folder
mkdir -p plugins/lib
# copy in the jar
cp lenses-sql-runners-XXX-all.jar plugins/lib
# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo $PWD/plugins/lib > worker.properties
# restart the workers
bin/connect-distributed worker.properties
If you are using Kafka versions 0.10.x and the plugin.path
classloader isolation is not available then
set the connector first on the classpath
export CLASSPATH=lenses-sql-runners-XXX-all.jar
Kubernetes installation
Kubernetes, a container orchestration engine, provides the perfect platform to run
streaming micro services. It has the ability to ensure a configured number of application
instances or pods
are running and to scale them up or down accordingly.
In our next release we are going to extend the ExecutionMode
to allow deploy
Lenses SQL processors into Kubernetes but Enterprise customers can already deploy the Lenses SQL
Connector into Kubernetes using our Docker. We recommend using Helm to manage and deploy the connector.
Helm
Lenses will utilize Helm <https://github.com/kubernetes/helm>
__, Helm is a package manager for Kubernetes
which allows you to set out in configuration which image you want, the container specs, the application environment
and the labelling and annotations in Kubernetes that allow for monitoring.
A list of existing Kafka Helm Charts are available on GitHub. The Lenses Connector SQL processor chart, available for Enterprise users, is packaged in the Connector release.
To deploy the Helm Chart, edit the values.yaml
accordingly or set the them at the command line. The values.yaml
contains
all the options previous described.
# Add repos other connector charts
helm repo add landoop https://datamountaineer.github.io/helm-charts/
# Install with values.yaml in dry run mode
helm install ./helm --dry-run --debug
# Install
helm install ./helm
# Install and override with different values from a file
helm install -f myvalues.yaml ./helm
# Install and override with different values from command line
helm install --set connect.sql.app.id=landoop ./helm
Kafka Connect topics in Kubernetes
When deploying into Kubernetes the Chart is configured to have backing topics per deployment with the following format:
connect_{{ .Values.clusterName }}_statuses connect_{{ .Values.clusterName }}_offsets connect_{{ .Values.clusterName }}_configs
Ensure the following
The status
topic used for monitoring the Connector is also set as connect_{{ .Values.clusterName }}_statuses
Ensure that these topics have an infinite retention period, retention.ms=-1
to avoid losing status, configurations
or offsets on restart .i.e Kafka has removed the entries.
Note: The connect-configs
topic must always ways be a compacted topic