Scaling Stream Processing

Lenses leverages Kafka Streams and currently provides three execution modes to run Lenses SQL processors. Processors is our term for a Kafka Streams application.

../_images/execution_modes.png

IN_PROC is the default execution mode and the processors are executed locally within Lenses. This can have scalability issues and poses a risk to the running application, and can affect stability. IN_PROC is recommended only for testing.

CONNECT is the execution mode that solves these limitations and provides availability guarantees and scalability. Lenses can deploy your Lenses SQL processors in Kafka Connect. Kafka Connect provides a distributed, fault-tolerant and scalable framework as part of the core Apache Kafka distribution. We advise allocating a Kafka Connect cluster for the purpose of running LSQL stream processing.

KUBERNETES is an execution mode that provides scalability by deploying Lenses SQL runners into Kubernetes clusters. Lenses can deploy and monitor SQL runner deployments created through Lenses or existing tools such as Helm or kubectl.

Connect

Kafka Connect provides scalable, fault tolerant, distributed processing by forming a cluster of workers. The cluster provides endpoints to which Lenses will submit the processor configuration. From this point, Kafka Connect will persist configurations and distribute work to the cluster workers. Upon a restart Lenses will recover the status, configuration, and metrics of any Lenses SQL Connectors that are found in the configured clusters, this ensures that if Lenses is offline processing of data in your topologies continues. Lenses will also identify any connectors created outside of Lenses at runtime and start tracking them to provide you visibility.

To scale in or out the number of processor applications we can simply instruct Kafka Connect to decrease or increase the number of tasks across the cluster. The Lenses UI provides a simple way to deploy Lenses SQL processors and scale them, simply:

  1. Creating a new processor and selecting the cluster to deploy to
  2. Compose your SQL statement
  3. Set the parallelization .i.e many how tasks/application instances to run
  4. Give the processor a name
  5. Deploy

Lenses will check the validity of the SQL statement and if valid create the Connector instance and start to monitor its behavior.

Lenses supports the following Connector functionality:

  • CREATE - Register and create a new connector
  • PAUSE - Pause the connector and tasks
  • START - Start a paused connector
  • DELETE - Remove a connector

Note

Updating an existing connector is not directly supported. The KStream app cannot be updated and the update is more than likely going to break the Schema compatibility of the target insert table but it can be scaled.

When Landoop’s FAST DATA CSD is used, the Cloudera parcel lenses-sql-connect can install and provision the connector in a few seconds. More information and step-by-step instructions on how to install the parcel can be found at FAST DATA docs

Kubernetes

Kubernetes, a container orchestration engine, provides the perfect platform to run streaming microservices. It has the ability to ensure a configured number of application instances or pods are running and to scale them up or down accordingly. Via Helm charts, we provide a Dock image for the LSQL runner to deploy via CI/CD in a repeatable and audited manner.

Lenses can deploy SQL runners, recover the runners currently deployed and track and identify deployments created outside of Lenses.

The Lenses SQL Runner image accepts the same configuration options as environment variables as the Kafka Connect runner with an additional sql.port to expose a rest endpoint.

Lenses deploys SQL runners as Kubernetes deployment resources. These deployments are labeled so Lenses can identify and track any changes via either Lenses or outside such as Helm or kubectl from a CI/CD pipeline.

The following labels are attached to deployments.

Label Value Description UserDefined Resource
lenses lenses-processor Identifier of Lenses No Deployment
app
This is the name of
the processor, it must be unique.
If created via the Lenses API it will handle
this. It must also conform to
Kubernetes naming conventions,
[a-z0-9]([-a-z0-9]*[a-z0-9])?.
Yes Deployment
lenses-id
This is an auto-generated
tracking id. If created via Lenses
it will be assigned. If created outside of
Lenses it must not be set.
Lenses will assign a new id.
No Deployment
lenses-user
The username who created the deployment Yes Deployment
containerPort 8083
The SQL runners expose a Restful
API so you can start, stop, get the status and metrics
of the underlying KStream. Additional
the internal RocksDb state store as exposed
to allow interactive queries
No Pod
pipeline
A user-defined pipeline tag for monitoring Yes Pod

Warning

Altering pod and deployment labels of existing SQL runner deployments may cause issues tracking the deployments.

The following rest endpoints are exposed by the containers. You can, for example, create a Kubernetes services with pod selectors to access them and access the state store for interactive queries.

# get health
curl pod_id:8083/health

# get metrics
curl pod_id:8083/metrics

# get the metadata, the internal state stored information for interactive queries
curl pod_id:8083/metadata

# get the metadata for the specified name, the internal state store for interactive queries
curl pod_id:8083/metadata/(name)

# get the stream information, sql, runners, consumer group id, username
curl pod_id:8083/stream

# stop the stream
curl -X POST pod_id:8083/stop

# start the stream
curl -X POST pod_id:8083/start

Deployment Recovery

Lenses can recover existing SQL deployments at startup that have been previously created by Lenses. Additionally, it can also track deployments created outside of Lenses. For example, you may be using Helm to control and manage deployments in your production environment out of a CI/CD pipeline.