All pages
Powered by GitBook
1 of 19

Applications

This page describes managing Kafka Connectors, SQL Processors and External Apps in Lenses.

Cover

Connectors

Deploy and manage Kafka Connectors.

Cover

SQL Processors

Deploy and manage SQL Processors for realtime data processing.

Cover

External Applications

Connect and monitor your custom apps and topology.

Connectors

This page describes how Lenses integrates with Kafka Connect to create, manage, and monitor connectors via multiple connect clusters.

Cover

Overview

Learn how to manage Kafka Connectors in Lenses.

Cover

Sources

Learn about Lenses Kafka Connect source plugins.

Cover

Sinks

Learn about Lenses Kafka Connect sink plugins.

Cover

Secret Providers

Learn about Lenses Kafka Connect secret provider plugins.

Overview

This page describes an overview of managing Kafka Connectors in Lenses.

For documentation about the available Lenses Apache 2.0 Connectors, see the Stream Reactor documentation.

To connect your Connect Clusters see provisioning.

Lenses connects to Connect Clusters via Connects APIs. You can deploy connectors outside of Lenses and Lenses will still be able to see and manage them

You can connect Lenses to one or more Kafka Connect clusters. Once connected, Lenses will list the available Connector plugins installed in each Cluster. Additionally, Connectors can automatically be restarted and alert notifications sent.

Listing Connectors

To list the deployed connectors go to Environments->[Your Environment]->Workspace->Connectors. Lenses will display a list of connectors and their status.

View Connector details

Once a connector has been created, selecting the connector allows us to:

  1. View its configuration

  2. Update its configurations (Action)

  3. View individual task configurations

  4. View metrics

  5. View exceptions.

View a Connector as Code

To view the YAML specification as Code, select the Code tab in the Connector details page.

Download a Connector as Code

To download the YAML specification, click the Download button.

Creating a Connector

To create a new connector go to Environments->[Your Environment]->Workspace->Connectors->New Connectors.

Select the Connect Cluster you want to use and Lenses will display the plugins installed in the Connect Cluster.

Connectors are searchable by:

  1. Type

  2. Author

After selecting a connector, enter the configuration of the connector instance. Lenses will show the documentation for the currently selected option.

To deploy and start the connector, click Create.

Create a Connector as Code

Creation of a Connector as code can be done via either

  1. Selecting Environments->[Your Environment]->Connectors->Configure Connector->Configure As Code from the main connector page, or

  2. Selecting a Connect Cluster and Connector, then the Code tab

Both options allow for direct input of a Connectors YAML specification or uploading of an existing file.

Managing a Connector's lifecycle

Connectors can be stopped, restarted, and deleted via the Actions button.

Sources

This page describes the available Apache 2.0 Source Connectors from Lenses. Lenses can also work with any other Kafka Connect Connector.

Lenses supports any Connector implementing the Connect APIs, bring your own or use community connectors.

Enterprise support is also offered for connectors in the Stream Reactor project, managed and maintained by the Lenses team.

Cover

AWS S3

Load data from AWS S3 including restoring topics.

Cover

Azure Data Lake Gen2

Load data from Azure Data Lake Gen2 including restoring topics.

Cover

Azure Event Hubs

Load data from Azure Event Hubs into Kafka topics.

Cover

Azure Service Bus

Load data from Azure Service Bus into Kafka topics.

Cover

Cassandra

Load data from Cassandra into Kafka topics.

Cover

GCP PubSub

Load data from GCP PubSub into Kafka topics.

Cover

GCP Storage

Load data from GCP Storage including restoring topics.

Cover

FTP

Load data from files on FTP servers into Kafka topics.

Cover

JMS

Load data from JMS topics and queues into Kafka topics.

Cover

MQTT

Load data from MQTT into Kafka topics.

Sinks

This page describes the available Apache 2.0 Sink Connectors from Lenses. Lenses can also work with any other Kafka Connect Connector.

Lenses supports any Connector implementing the Connect APIs, bring your own or use community connectors.

Enterprise support is also offered for connectors in the Stream Reactor project, managed and maintained by the Lenses team.

Cover

AWS S3

Sink data from Kafka to AWS S3 including backing up topics and offsets.

Cover

Azure CosmosDB

Sink data from Kafka to Azure CosmosDB.

Cover

Azure Data Lake Gen2

Sink data from Kafka to Azure Data Lake Gen2 including backing up topics and offsets.

Cover

Azure Event Hubs

Load data from Azure Event Hubs into Kafka topics.

Cover

Azure Service Bus

Sink data from Kafka to Azure Service Bus topics and queues.

Cover

Cassandra

Sink data from Kafka to Cassandra.

Cover

Elasticsearch

Sink data from Kafka to Elasticsearch.

Cover

GCP PubSub

Sink data from Kafka to GCP PubSub.

Cover

GCP Storage

Sink data from Kafka to GCP Storage.

Cover

HTTP Sink

Sink data from Kafka to a HTTP endpoint.

Cover

InfluxDB

Sink data from Kafka to InfluxDB.

Cover

JMS

Sink data from Kafka to JMS.

Cover

MongoDB

Sink data from Kafka to MongoDB.

Cover

MQTT

Sink data from Kafka to MQTT.

Cover

Redis

Sink data from Kafka to Redis.

Secret Providers

This page describes the available Apache 2.0 Connect Secret Providers from Lenses.

You are not limited to Lenses Secret Providers, you are free to use your own.

Cover

AWS Secret Manager

Secure Kafka Connect secrets with AWS Secret Manager.

Cover

Azure KeyVault

Secure Kafka Connect secrets with Azure KeyVault.

Cover

Environment Variables

Secure Kafka Connect secrets with Environment Variable.

Cover

Hashicorp Vault

Secure Kafka Connect secrets with Hashicorp Vault.

Cover

AES256

Secure Kafka Connect secrets with AES256 encryption.

SQL Processors

This page describes creating and managing SQL Processors in Lenses.

SQL Processors continuously process data in Kafka topics. Under the hood, the Kafka Streams APIs are combined with the internal Lenses application deployment framework. A user gets a seamless experience for creating scalable processing components to filter, aggregate, join, and transform Kafka topics, which can scale natively to Kubernetes.

SQL Processors are continuous queries, they process data as it arrives and react to events, e.g. payment made, $5 to Coffeshop. The data is active, the query is passive.

SQL Processors offer:

  • A no-code, stand-alone application executing a given Lenses SQL query on current and future data

  • Query graph visualisation

  • Fully integrated experience within Lenses

  • ACLs and Monitoring functionality out-of-the-box

  • Ability to scale up and down workflows via Kubernetes

SQL Processors are long-lived applications that continuously process data. In use with Kubernetes mode (recommended), they are deployed via Lenses, as Kubernetes deployments separate from the Lenses instance.

Creating a SQL Processor

To create SQL Processors go to Environments->[Your Environment]->Workspace->Apps->New App->SQL Processor.

  1. Enter a name

  2. Enter your SQL statement, the editor will help you with IntelliSense

  3. Optionally specify a description, tag and ProcessorID (consumer group ID)

  4. Select the deployment target, Kubernetes this will be a Kubernetes cluster and namespace.

If the configuration is valid the SQL Processor will be created. You can then click Start to deploy. Processors are not started automatically, click Start in the Actions menu to start the Processor.

Starting a SQL Processor

To start a Processor, select Start from the Action menu.

Viewing SQL Processors

Select the SQL processor. Lenses will show an overview of the health of the processor.

Selecting a Runner will display further information about each individual runner.

The metadata in the Summary tab also contains the consumer group ID (Processor ID).

The SQL tab shows the SQL Streaming statement the processor is running. A visual representation is shown in the Data Flow tab.

The Configuration tab shows the low-level settings.

Scaling a SQL Processor

To scale a processor, Select Scale from the Actions menu, and enter the desired amount of runners. In Kubernetes mode, the runners are pods.

Stopping a SQL Processor

Select Stop in the Actions Menu.

Snippets

Lenses provides helpful snippets for common scenarios. Select the snippet from the Help section.

Processor ID vs Application ID

ProcessorID is the public unique identifier for an SQL processor. It is customizable, meaning that you, as a user, have control over it and can set this identifier to any arbitrary string.

Restrictions on custom ProcessorIDs:

  • They have to be unique across all processors

  • Match the following regex: ^[a-zA-Z0-9\-\._]+:

    • Only letters, capital letters, numbers and -, _ & - are allowed.

    • It has to start with a letter or a number.

    • It cannot be empty.

One important aspect of the ProcessorID is that it is used as the Kafka consumer group identifier. That means that, in practice, this is the value that allows an SQL processor to build its consumer group and coordinate record ingestion from Kafka between all Processor replicas. Consequently, if the ProcessorID of a given SQL processor is changed, that processor will restart consuming messages from the beginning of the existing records in the topic.

The ApplicationID is the Lenses unique identifier, is automatically created by Lenses, and cannot be customized.

This is unique among all applications types; it does not matter if it’s an SQL processor or a different (new) sort of future application.

Lenses uses the ApplicationID to manage applications. This means that, when Starting, Stopping or Scaling an application, Lenses will use this attribute to pick the right instance.

Concepts

This page describes the concepts of Lenses SQL Processors from joining, aggregating and filtering data in Kafka.

SQL Processors see data as an independent sequence of infinite events. An Event in this context is a datum of information; the smallest element of information that the underlying system uses to communicate. In Kafka’s case, this is a Kafka record/message.

Two parts of the Kafka record are relevant:

  • Key

  • Value

These are referred to as facets by the engine. These two components can hold any type of data and Kafka itself is agnostic on the actual storage format for either of these two fields. SQL Processors interpret records as (key, value) pairs, and it exposes ways to manipulate these pairs in several ways.

Queries are applications: SQL Processors

As mentioned above, queries that are meant to be run on streaming data are treated as stand-alone applications. These applications, in the context of the Lenses platform, are referred to as SQL Processors.

A SQL Processor encapsulates a specific Lenses SQL query, its details and everything else Lenses needs to be able to run the query continuously.

Schemas must be available for Structured Data

To support features like:

  • Inference of output schemas

  • Creation-time validation of input query

  • Selections

  • Expressions

Lenses SQL Engine Streaming mode needs to have up-to-date schema information for all structured topics that are used as input in a given query. In this context, structured means topics that are using complex storage formats like AVRO or JSON.

INSERT INTO daily-item-purchases-stats
SELECT STREAM
    itemId
    , COUNT(*) AS dailyPurchases
    , AVG(price / quantity) AS average_per_unit
FROM purchases
WINDOW BY TUMBLE 1d
GROUP BY itemId;

For the above query, for example, the purchases topic will need to have a value set to a structured format and a valid schema will need to already have been configured in Lenses. In such schema, fields itemId, price and quantity must be defined, the latter two being of a numerical type.

These requirements ensure the Engine will always be in a position to know what kind of data it will be working with, guaranteeing at the same time that all obvious errors are caught before a query is submitted.

M-N topologies

The UI allows us to visualise any SQL Processor out of the box. For the example:

This visualisation helps to highlight that the Lenses SQL fully supports M-N topologies.

What this means is that multiple input topics can be read at the same time, their data manipulated in different ways and then the corresponding results stored in several output topics, all as part of the same Processor’s topology.

This means that all processing can be done in one go, without having to split parts of a topology into different Processors (which could result in more data being stored and shuffled by Kafka).

Expressions in Lenses SQL

An expression is any part of a Lenses SQL query that can be evaluated to a concrete value (not to be confused with a record value).

In a query like the following:

INSERT INTO target-topic
SELECT STREAM
    CONCAT('a', 'b') AS result1
    , (1 + field1) AS result2
    , field2 AS result3
    , CASE
        WHEN field3 = 'Robert' THEN 'It's bobby'
        WHEN field3 = 'William' THEN 'It's willy'
        ELSE 'Unknown'
      END AS who_is_it
FROM input-topic
WHERE LENGTH(field2) > 5;

CONCAT('a', 'b'), (1 + field1) and field2 are all expressions whose values will be _projected_ onto the output topic, whereas LENGTH(field2) > 5 is an expression whose values will be used to filter out input records.

SQL Processors and Kafka Streams

SQL Processors are built on top of Kafka Streams, and it enriches this tool with an implementation of Lenses SQL that fits well with the architecture and design of Kafka Streams. When executed, they run a Kafka Streams instance.

Consumer groups

Each SQL Processor has an application ID which uniquely identifies it within Lenses. The application ID is used as the Kafka Streams application ID which in turn becomes the underlying Kafka Consumer(s) group identifier.

Scaling

Scaling up or down the number of runners automatically adapts and rebalances the underlying Kafka Streams application in line with the Kafka group semantics.

The advantages of using Kafka Streams as the underlying technology for SQL Processors are several:

  • Kafka Streams is an enterprise-ready, widely adopted and understood technology that integrates natively with Kafka

  • Using consumer group semantics allows leveraging Kafka’s distribution of workload, fault tolerance and replication out of the box

Data as a flow of events: Streams

A stream is probably the most fundamental abstraction that SQL Processors provide, and it represents an unbounded sequence of independent events over a continuously changing dataset.

Let’s clarify the key terms in the above definition:

  • event: an event, as explained earlier, is a datum, that is a (key, value) pair. In Kafka, it is a record.

  • continuously changing dataset: the dataset is the totality of all data described by every event received so far. As such, it is changed every time a new event is received.

  • unbounded: this means that the number of events changing the dataset is unknown and it could even be infinite

  • independent: events don’t relate to each other and, in a stream, they are to be considered in isolation

The main implication of this is that stream transformations (e.g. operations that preserve the stream semantics) are stateless because the only thing they need to take into account is the single event being transformed. Most Projections fall within this category.

Stream Example

To illustrate the meaning of the above definition, imagine that the following two events are received by a stream:

("key1", 10)
("key1", 20)

Now, if the desired operation on this stream was to sum the values of all events with the same key (this is called an Aggregation), the result for "key1" would be 30, because each event is taken in isolation.

Finally, compare this behaviour with that of tables, as explained below, to get an intuition of how these two abstractions are related but different.

Stream syntax

Lenses SQL streaming supports reading a data source (e.g. a Kafka topic) into a stream by using SELECT STREAM.

SELECT STREAM *
FROM input-topic;

The above example will create a stream that will emit an event for each record, including future ones.

Data as a snapshot of the state of the world: Tables

While a stream is useful to have visibility to every change in a dataset, sometimes it is necessary to hold a snapshot of the most current state of the dataset at any given time.

This is a familiar use-case for a database and the Streaming abstraction for this is aptly called table.

For each key, a table holds the latest version received of its value, which means that upon receiving events for keys that already have an associated value, such values will be overridden.

A table is sometimes referred to as a changelog stream, to highlight the fact that each event in the stream is interpreted as an update.

Given its nature, a table is intrinsically a stateful construct, because it needs to keep track of what it has already been seen. The main implication of this is that table transformations will consequently also be stateful, which in this context means that they will require local storage and data being copied.

Additionally, tables support delete semantics. An input event with a given key and a null value will be interpreted as a signal to delete the (key, value) pair from the table.

Finally, a table needs the key for all the input events to not be null. To avoid issues, tables will ignore and discard input events that have a null key.

Table example

To illustrate the above definition, imagine that the following two events are received by a table:

("key1", 10)
("key1", 20)

Now, if the desired operation on this table was to sum the values of all events with the same key (this is called an Aggregation), the result for key1 would be 20, because (key1, 20) is interpreted as an update.

Finally, compare this behaviour with that of streams, as explained above, to get an intuition of how these two abstractions are related but different.

Table syntax

Lenses SQL Streaming supports reading a data source (e.g. a Kafka topic) into a table by using SELECT TABLE.

SELECT TABLE *
FROM input-topic;

The above example will create a table that will treat each event on input-topic, including future ones, as updates.

Tables and compacted topics

Given the semantics of tables, and the mechanics of how Kafka stores data, the Lenses SQL Streaming will set the cleanup.policy setting of every new topic that is created from a table to compact, unless explicitly specified otherwise.

What this means is that the data on the topic will be stored with a semantic more closely aligned to that of a table (in fact, tables in Kafka Streams use compacted topics internally). For further information regarding the implications of this, it is advisable to read the official Kafka Documentation about cleanup.policy.

The duality between streams and tables

Streams and tables have significantly different semantics and use cases, but one interesting observation is that are strongly related nonetheless.

This relationship is known as stream-table duality. It is described by the fact that every stream can be interpreted as a table, and similarly, a table can be interpreted as a stream.

  • Stream as Table: A stream can be seen as the changelog of a table. Each event in the stream represents a state change in the table. As such, a table can always be reconstructed by replaying all events of a stream, in order.

  • Table as Stream: A table can be seen as a snapshot, at a point in time, of the latest value received for each key in a stream. As such, a stream can always be reconstructed by iterating over each (Key, Value) pair and emitting it as an event.

To clarify the above duality, let’s use a chess game as an example.

Kafka table and stream duality

On the left side of the above image, a chessboard at a specific point in time during a game is shown. This can be seen as a table where the key is a given piece and the value is its position. Also, on the right-hand side, there is the list of moves that culminated in the positioning described on the left; it should be obvious that this can be seen as a stream of events.

The idea formalised by the stream-table duality is that, as it should be clear from the above picture, we can always build a table from a stream (by applying all moves in order).

It is also always possible to build a stream from a table. In the case of the chess example, a stream could be made where each element represents the current state of a single piece (e.g. w: Q h3).

This duality is very important because it is actively used by Kafka (as well as several other storage technologies), for example, to replicate data and data stores and to guarantee fault tolerance. It is also used to translate table and stream nodes within different parts of a query.

SQL Processors and schemas: a proactive approach

One of the main goals of SQL Processors is to ensure that it uses all the information available to it when a SQL Processor is created to catch problems, suggest improvements and prevent errors. It’s more efficient and less frustrating to have an issue coming up during registration rather than at some unpredictable moment in the future, at runtime, possibly generating corrupted data.

SQL engine will actively check the following during the registration of a processor:

  • Validation of all user inputs

  • Query lexical correctness

  • Query semantics correctness

  • Existence of the input topics used within the query

  • User permissions to all input and output topics

  • Schema alignment between fields and topics used within the query

  • Format alignment between data written and output topics, if the latter already exist

When all the above checks pass, the Engine will:

  • Generate a SQL Processor able to execute the user’s query

  • Generate and save valid schemas for all output topics to be created

  • Monitor the processor and make such metrics available to Lenses

The Engine takes a principled and opinionated approach to schemas and typing information; what this means is that, for example, where there is no schema information for a given topic, that topic’s fields will not be available to the Engine, even if they are present in the data; also, if a field in a topic is a string, it will not be possible to use it as a number for example, without explicitly CASTing it.

The Engine’s approach allows it to support naming and reusing parts of a query multiple times. This can be achieved using the dedicated statement WITH.

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';
SET enable.auto.commit=false;
SET auto.offset.reset='earliest';

WITH countriesStream AS (
  SELECT STREAM *
  FROM countries
);

WITH merchantsStream AS (
  SELECT STREAM *
  FROM merchants
);


WITH merchantsWithCountryInfoStream AS (
  SELECT STREAM
    m._key AS l_key
    , CONCAT(surname, ', ', name) AS fullname
    , address.country
    , language
    , platform
  FROM merchantsStream AS m
        JOIN countriesStream AS c
            ON m.address.country = c._key  
  WITHIN 1h
);

WITH merchantsCorrectKey AS(
  SELECT STREAM
    l_key AS _key
    , fullname
    , country
    , language
    , platform
  FROM merchantsWithCountryInfoStream
);

INSERT INTO currentMerchants
SELECT STREAM *
FROM merchantsCorrectKey;

INSERT INTO merchantsPerPlatform
SELECT TABLE
  COUNT(*) AS merchants
FROM merchantsCorrectKey
GROUP BY platform;

The WITHs allow for whole sections of the query to be reused and manipulated independently by successive statements, and all this is done by maintaining schema and format alignment and correctness. The reason why this is useful is that it allows to specify queries that split their processing flow without having to redefine parts of the topology. This, in turn, means that less data needs to be read and written to Kafka, improving performance.

This is just an example of what SQL Processors can offer because of the design choices taken and the strict rules implemented at query registration.

Projections

This page describes using projections for data in Kafka with Lenses SQL Processors.

A projection represents the ability to project a given input value onto a target location in an output record. Projections are the main building block of SELECT statements.

A projection is composed of several parts. Projections have a source section that allows one to select a specific value and to ensure that it will be present in the output, in the desired structure and with the desired name, as described by the target section.

In the below query:

INSERT INTO target-topic
SELECT STREAM
    CONCAT('a', 'b') AS result1
    , field4
    , (1 + field1) AS _key.a
    , _key.field2 AS result3
    ,  5 + 7 AS constantField
    ,  CASE
        WHEN field3 = 'Robert' THEN 'It's bobby'
        WHEN field3 = 'William' THEN 'It's willy'
        ELSE 'Unknown'
       END AS who_is_it
FROM input-topic;

These are projections:

  • CONCAT(‘a’, ‘b’) as result1

  • field4

  • (1 + field1) as _key.a

  • _key.field2 as result3

  • 5 + 7 as constantField

  • CASE … as who_is_it

It is worth highlighting that projections themselves are stateless. While the calculation of the source value could be stateful (depending on the type of expression being evaluated, the act of making the value available in the output is not a stateful operation.

Syntax

The precise syntax of a projection is:

 <expression> [as [<facet>.]<alias>]|[<facet>]]
|------------||-------------------------------|
    source                  target

In the above, [] indicates optional sections and | is to be read as OR.

  • Source:

    • <expression>: the source section must consist of a valid SQL expression, which will be evaluated to the value to be projected.

  • Target: this section is optional. When missing, the output target facet will be _value and the <alias> will be defaulted to the string representation of the source expression.

    • as: this is the Lenses SQL keyword that makes specifying the target of the projection explicit.

    • [<facet>.]<alias>]|[<facet>]: this nested section, which must be specified whenever as is used, is composed of two mutually exclusive sub-sections.

      • [<facet>.]<alias>]: this optional section is a string that will be used as the field name in the output record. Optionally, the output _facet_ where the field will be projected (e.g. _key or _value) can be also specified.

      • [<facet>]: this optional section specifies that the result of the projection will make up the whole of the indicated facet of the output record (e.g. the whole key or the whole value).

The above syntax highlights something important about the relationship between projections and facets: a projection can have a source facet and will always have a target facet and while they are related, they might not always be the same.

In the above example query:

  • field4 is a projection from value facet to value facet

  • (1 + field1) as _key.a is a projection from the value facet to the key facet

  • _key.field2 as result3 is a projection from the key facet to the value facet

  • 5 + 7 as constantField is a projection to value facet but with no source facet, as the expression does not depend on the input record

This makes SQL projections very expressive, but it also means that attention needs to be paid when facets are manipulated explicitly. Details, edge cases and implications of this will be discussed below.

Source of a Projection

Projection from a _value field

This is the default type of projection when a field is selected within the source expression. Unless otherwise specified, the source facet of a projection will always be _value.

In light of the above, the following query contains only projections that read from the _value facet of the source:

INSERT INTO target-topic
SELECT STREAM
    CONCAT('--> ', _value.field2, ' <--')
    , field1
    , LENGTH(field4)
    , _value.field1 AS aliased
    , 5 + field3
FROM input-topic;

Also, notice that field1 and _value.field1 as aliased are reading the exact same input field and in the former case the _value facet is implicit.

Projection from a key field

A projection can also access a selected field on the key facet.

INSERT INTO target-topic
SELECT STREAM
    CONCAT('--> ', _key.field2, ' <--')
    , _key.field1
    , LENGTH(_key.field4)
    , 5 + _key.field3
FROM input-topic;

The above query contains only projections that read from the _key facet of the source.

This kind of projection behaves exactly the same as any other projection, but because of specific interactions with other mechanics in Lenses SQL Engine Streaming mode, they can’t be used in the same query with Aggregations or Joins.

Projection from whole facets

All examples of projections described until now focused on selecting a field from either the key or the value of an input record. However, a projection can also read a whole facet and project it to the output.

INSERT INTO target-topic
SELECT STREAM
    _key AS old-key
    , _value AS old-value
FROM input-topic;

In the above query, there are two projections:

  • _key as old-key: This is projecting the whole key of input-topic onto a field old-key on target-topic

  • _value as old-value: This is projecting the whole value of input-topic onto a field old-value on target-topic

For more details about the rules around using aliases (as done in the above example).

This can be useful when the input source uses a primitive storage format for either one or both facets, but it is desirable to map such facets to named fields within a more complex output structure, as could be the case in the above query. This said projections from whole facets are supported for all storage formats, not only the primitive ones.

Mix and match

As it should be clear from all the examples on this page so far, projections can be freely mixed within a single SELECT statement; the same query can have many projections, some of which could be reading from the key of the input record, some others from the value and yet others returning literal constants.

Lenses SQL is designed to support this mixed usage and to calculate the appropriate resulting structure given the schemas of all the projections’ inputs.

Wildcard projections

Lenses SQL assigns a special meaning to * when used as a projection.

Unqualified Wildcard projection

When * is used without any further qualification, it is interpreted as an instruction to project all fields from _key_ and _value_ to the output.

INSERT INTO target-topic
SELECT STREAM *
FROM input-topic;

The result of this query is that target-topic will have exactly the same fields, schema and data than input-topic.

Qualified wildcard projection

When * is explicitly qualified, the meaning becomes more precise and it will limit the fields to be selected to only the ones belonging to the qualified source (and optionally facet).

INSERT INTO target-topic
SELECT STREAM
    i1.*
    , i2.field1
FROM input-topic1 AS i1 JOIN input-topic2 AS i2
WITHIN 1h;

The above shows how a qualified wildcard projection can be used to target all the fields of a specific source. Additionally, a qualified wildcard can be used in addition to other normal projections (e.g. i2.field1).

Target of a Projection

The target of a projection is the location within the output record where the result of the projection’s expression is going to be mapped. As previously mentioned, Lenses SQL uses the keyword as to explicitly control this.

Using as, it is possible to:

  • Assign an alias to the projected field in the result. For example, field1 as aliased-field1 is reading field1 and projecting its value to a field called aliased-field1. Notice that, as no facet information is specified, this will be targeting the value of the output record.

  • Project directly to nested fields within structures

INSERT INTO target-topic
SELECT STREAM
    field1 AS x.a
    , field2 AS x.b,
FROM input-topic;

The above query will result in a field x that is a structure that contains two fields a and b.

  • Control the facet to which the field is projected. For example, field1 as _key.field1 is reading field1 (from the value facet) and projecting to a field with the same name on the key of the output. Depending on the source being projected, doing this might have important implications.

  • Project over the whole target value or key For example, field1 as _value is reading field1 and projecting it over the whole value of the output. One important thing about this is that Lenses SQL allows only one projection of this kind per facet per query. What this means is that the following query would be invalid, because only one projection can target _value within the same query:

INSERT INTO target-topic
SELECT STREAM  
    field1 AS _value
    , field2 AS _value,
FROM input-topic;

Rules for nested aliases

In order to avoid potential errors, Lenses defines the following rules for defining aliases:

  • Alias can’t add new fields to non-struct properties.

    • e.g: an_int AS foo.bar, field1 AS foo.bar.field1 (since bar will be an INT we can’t define a property field1 under it)

  • Aliases cannot override previously defined properties.

    • e.g: field1 AS foo.bar.field1, field2 as foo.bar (setting foo.bar with the contents of field2 would override the value of field1)

  • Fields cannot have duplicated names.

    • e.g: field1 AS foo.bar, field2 as foo.bar (setting foo.bar with the contents of field2 would override it’s previous content)

Implications of projecting on key

Projecting on Key is a feature that can be useful in situations where it is desirable to quickly change the key of a Table or a Stream, maybe in preparation for further operations (e.g. joins etc…). This feature is sometimes referred to as re-keying within the industry.

However, one important implication of using this feature is that Kafka uses the key to determine in what partition a record must be stored; by changing the key of the record, the resulting partitioning of the output topic might differ from one of the input topics. While there is nothing wrong with this, it is something that must be understood clearly when using this feature.

Filtering

Sometimes it is desirable to limit the input records to be projected based on some predicate.

For example, we might want to project field1 and field2 of input-topic onto output-topic, but only if field3 contains a specific value.

INSERT INTO target-topic
SELECT STREAM  
    field1
    , field2,
FROM input-topic
WHERE field3 = 'select_me';

This is the WHERE clause is used: to filter the input dataset by some predicate, applying the rest of the query only to records that match the predicate.

The syntax for this clause is simply WHERE <expression>, where <expression> is a valid arbitrarily nested Lenses SQL boolean expression.

INSERT INTO target-topic
SELECT STREAM  
    field1
    , field2,
FROM input-topic
WHERE (field3 = 'select_me' AND LENGTH(CONCAT(field1, field2)) >= 5)
    OR field4 = (field5 + 5);

Projections and Storage Formats

Projections have a close relationship with the storage format of their target.

By default, if a query contains more than one projection for the same facet, then that facet’s storage format will be a structure (which type of structure exactly depends on other factors that are not relevant here).

INSERT INTO target-topic
SELECT STREAM  
    field1 as result1
    , field2
FROM input-topic;

The above query will make target-topic’s value storage format a structure (e.g. AVRO or JSON) with two fields named: result1 and field2. The storage format for target-topic’s key will be the same as the input-topic’s, as there are no projections targeting that facet.

The storage format of the output can be explicitly changed by a projection, however. This will often be the case when a projection on a whole facet is used. Consider the following query:

INSERT INTO target-topic
SELECT STREAM  
    field1 AS result1
    , field2 AS _key,
FROM input-topic;  

In this case, target-topic’s value storage format will still be a structure, but its key will depend on field2’s schema. For example, if field2 is a string, then target-topic’s key will be changed to STRING (assuming it was not STRING already). The same behavior applies to the _value_ facet.

One example where this can be relevant is when a projection is used to map the result of a single field in the target topic. Consider the following query:

INSERT INTO target-topic
SELECT STREAM field1 AS _value
FROM input-topic;

This query will project field1 on the whole value facet, and this will result in a change of storage format as well. This behaviour is quite common when a single projection is used, because more often than not the desired output in such a scenario will be the content of the field rather than a structure with a single field.

Joins

This page describes how to join data in Kafka with in Lenses SQL Processors.

Joins allow rows from different sources to be combined.

Lenses allows two sources of data to be combined based either on the equality of their _key facet or using a user-provided expression.

A query using joins looks like a regular query apart from the definition of its source and in some cases the need to specify a window expression:

SELECT (STREAM|TABLE)
  <projection>
FROM
  <sourceA> [LEFT|RIGHT|INNER|OUTER] JOIN
  <sourceB> [ON <joinExpression>]
  [WITHIN <duration>]
WHERE
  <filterExpression>;
  • projection: the projection of a join expression differs very little from a regular projection. The only important consideration is that since data is selected from two sources, some fields may be common to both. The syntax table.field is recommended to avoid this type of problem.

  • sourceA/sourceB : the two sources of data to combine.

  • window: only used if two streams are joined. Specifies the interval of time to search matching results.

  • joinExpression: a boolean expression that specifies how the combination of the two sources is calculated.

  • filterExpression: a filter expression specifying which records should be filtered.

Join types

When two sources of data are combined it is possible to control which records to keep when a match is not found:

Disclaimer: The following examples do not take into consideration windowing and/or table materialization concerns.

Customers

_key
id
name

1

1

John

2

2

Frank

Orders

_key
customer_id
item

1

1

Computer

2

1

Mouse

3

3

Keyboard

Inner join

WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT STREAM
    customersTable.name
   , ordersStream.item
 FROM
    ordersStream JOIN customersTable
        ON customersTable.id = ordersStream.customer_id;

This join type will only emit records where a match has occurred.

name
item

John

Computer

John

Mouse

(Notice there’s no item with customer.id = 2 nor is there a customer with id = 3 so these two rows are not present in the result).

Left join

WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT STREAM
    customersTable.name
    , ordersStream.item
 FROM
    ordersStream LEFT JOIN customersTable
        ON customersTable.id = ordersStream.customer_id;

This join type selects all the records from the left side of the join regardless of a match:

name
item

John

Computer

John

Mouse

null

Keyboard

(Notice all the rows from orders are present but since no customer.id = 3 no name can be set.)

Right join

WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT TABLE
    customersTable.name
    , ordersStream.item
 FROM
    customersTable RIGHT JOIN ordersStream
        ON customersTable.id = ordersStream.customer_id;

A right join can be seen as a mirror of a LEFT JOIN. It selects all the records from the right side of the join regardless of a match:

name
item

John

Computer

John

Mouse

null

Keyboard

Outer Join

WITH customersStream AS (SELECT STREAM * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT TABLE
    customersStream.name
    , ordersStream.item
 FROM
    ordersStream OUTER JOIN customersStream
        ON customersTable.id = ordersStream.customer_id;

An outer join can be seen as the union of left and right joins. It selects all records from the left and right side of the join regardless of a match happening:

name
item

John

Computer

John

Mouse

null

Keyboard

Frank

null

Matching expression (ON)

By default, if no ON expression is provided, the join will be evaluated based on the equality of the _key facet. This means that the following queries are equivalent:

SELECT TABLE *
FROM customers JOIN orders;

SELECT TABLE *
FROM customers JOIN orders
    ON customers._key = orders._key;

When an expression is provided, however, there are limitations regarding what kind of expressions can be evaluated.

Currently, the following expression types are supported:

  • Equality expressions using equality (=) with one table on each side:

    • customers.id = order.user_id

    • customers.id - 1 = order.user_id - 1

    • substr(customers.name, 5) = order.item

  • Any boolean expression which references only one table:

    • len(customers.name) > 10

    • substr(customer.name,1) = "J"

    • len(customer.name) > 10 OR customer_key > 1

  • Allowed expressions mixed together using an AND operator:

    • customers._key = order.user_id AND len(customers.name) > 10

    • len(customers.name) > 10 AND substr(customer.name,1) = "J"

    • substr(customers.name, 5) = order.item AND len(customer.name) > 10 OR customer_key > 1

Any expressions not following the rules above will be rejected:

  • More than one table is referenced on each side of the equality operator

    • concat(customer.name, item.name) = "John"

    • customer._key - order.customer_id = 0

  • a boolean expression not separated by an AND references more than one table:

    • customer._key = 1 OR customer._key = order.customer_id

Windowing: stream to stream joins (WITHIN )

When two streams are joined Lenses needs to know how far away in the past and in the future to look for a matching record.

This approach is called a “Sliding Window” and works like this:

Customers

arrival
id
name

t = 6s

1

Frank

t = 20s

2

John

Purchases

arrival
item
customer_id

t = 10s

Computer

1

t = 11s

Keyboard

2

SELECT STREAM
     customers.name
    , orders.item
FROM
    customers LEFT JOIN orders WITHIN 5s
        ON customers.id = orders.customer_id
WITHIN 5s;

At t=10, when both the Computer and the Keyboard records arrive, only one customer can be found within the given time window (the specified window is 5s thus the window will be [10-5,10+5]s ).

This means that the following would be the result of running the query:

name
item

Frank

Computer

null

Keyboard

Note: John will not match the Keyboard purchase since t=20s is not within the window interval [10-5,10+5]s.

Non-windowed joins (stream to table and table to stream)

When streaming data, records can be produced at different rates and even out of order. This means that often a match may not be found because a record hasn’t arrived yet.

The following example shows an example of a join between a stream and a table where the arrival of the purchase information is made available before the customers’ information is.

(Notice that the purchase of a “Keyboard” by customer_id = 2 is produced before the record with the customer details is.)

Customers

arrival
id
name

t = 0s

1

Frank

t = 10s

1

John

Purchases

arrival
item
customer_id

t = 0s

Computer

1

t = 1s

Keyboard

2

Running the following query:

WITH customersTable AS (SELECT TABLE * FROM customers);

SELECT STREAM
    customers.name
   , item.item
FROM
    orders LEFT JOIN customersTable  
        ON customers.id = orders.id

would result in the following:

name
item

Frank

Computer

null

Keyboard

If later, the record for customer_id = 2 is available:

arrival
id
name

t = 10s

2

John

a record would be emitted with the result now looking like the following:

name
item

Frank

Computer

null

Keyboard

John

Keyboard

Notice that “Keyboard” appears twice, once for the situation where the data is missing and another for when the data is made available.

This scenario will happen whenever a Stream is joined with a Table using a non-inner join.

Table/Stream joins compatibility table

The following table shows which combinations of table/stream joins are available:

Left
Right
Allowed types
Window
Result

Stream

Stream

All

Required

Stream

Table

Table

All

No

Table

Table

Stream

RIGHT JOIN

No

Stream

Stream

Table

INNER, LEFT JOIN

No

Stream

Key decoder types

In order to evaluate a join between two sources, the key facet for both sources has to share the same initial format.

If formats are not the same the join can’t be evaluated. To address this issue, an intermediate topic can be created with the correct format using a STORE AS statement. This newly created topic can then be created as the new source.

Co-partitioning

In addition to the constraint aforementioned, when joining, it’s required that the partition number of both sources be the same.

When a mismatch is found, an additional step will be added to the join evaluation in order to guarantee an equal number of partitions between the two sources. This step will write the data from the source topic with a smaller count of partitions into an intermediate one.

This newly created topic will match the partition count of the source with the highest partition count.

In the topology view, this step will show up as a Repartition Node.

ON expressions and key change

Joining two topics is only possible if the two sources used in the join share the same key shape and decoder.

When an ON statement is specified, the original key facet will have to change so that it matches the expression provided in the ON statement. Lenses will do this calculation automatically. As a result, the key schema of the result will not be the same as either one of the sources. It will be a Lenses calculated object equivalent to the join expression specified in the query.

Nullability

As discussed when addressing join types, some values may have null values when non-inner joins are used.

Due to this fact, fields that may have null values will be typed as the union of null and their original type.

Joining more than 2 sources

Within the same query, joins may only be evaluated between two sources.

When a join between more than two sources is required, multiple queries can be combined using a WITH statement:

WITH customerOrders AS (
 SELECT TABLE
    customer.name
   , order.item,
   , order._key AS order_id
 FROM
    customers INNER JOIN orders
        ON orders.customer_id = customers.id
);

INSERT INTO target
SELECT TABLE *
FROM customerOrders INNER JOIN deliveryAddress
    ON customerOders.order_id = deliveryAddress.order_id;

Joining and Grouping

In order to group the results of a join, one just has to provide a GROUP BY expressions after a join expression is specified.

SET defaults.topic.autocreate=true;
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

WITH joined AS (
    SELECT STREAM
        customersTable.name
        , ordersStream.item
     FROM
        ordersStream JOIN customersTable
            ON customersTable.id = ordersStream.customer_id
    GROUP BY customersTable.name;
);

Stream-Table/Table-Stream joins: table materialization

emmited
processed
id
name

t = 0s

t = 20s

1

Frank

Purchases

arrival
processed
item
customer_id

t = 0s

t = 10s

Computer

1

WITH customersStream AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT TABLE
    customersStream.name
   , ordersStream.item
 FROM
    ordersStream OUTER JOIN customersStream
        ON customersTable.id = ordersStream.customer_id;

When a join between a table and a join is processed, lenses will, for each stream input (orders in the example above), look for a matching record on the specified table (customers).

Notice that the record with Frank’s purchase information is processed at t = 10s at which point Frank’s Customer information hasn’t yet been processed. This means that no match will be found for this record.

At t=20s however, the record with Frank’s customer information is processed; this will only trigger the emission of a new record if an Outer Join is used.

Filter optimizations

There are some cases where filter expressions can help optimize a query. A filter can be broken down into multiple steps so that some can be applied before the join node is evaluated. This type of optimization will reduce the number of records going into the join node and consequentially increase its speed.

For this reason, in some cases, filters will show up before the join in the topology node.

Lateral Joins

This page describes how to use lateral joins for data in Kafka with Lenses SQL Processors.

With Lateral Joins you can combine a data source with any array expression. As a result, you will get a new data source, where every record of the original one will be joined with the values of the lateral array expression.

Assume you have a source where elements is an array field:

field1
field2
elements

a

1

[1, 2]

b

2

[3, 4, 5]

c

3

[6]

Then a Lateral Join of source with elements is a new table, where every record of source will be joined with all the single items of the value of elements for that record:

field1
field2
elements
element

a

1

[1, 2]

1

a

1

[1, 2]

2

b

2

[3, 4, 5]

3

b

2

[3, 4, 5]

4

b

2

[3, 4, 5]

5

c

3

[6]

6

In this way, the single elements of the array become available and can be used as a normal field in the query.

Syntax

A query using lateral joins looks like a regular query apart from the definition of its source:

SELECT (STREAM|TABLE)
  <projection>
FROM
  <source> LATERAL
  <lateralArrayExpression> AS <lateralAlias>
WHERE
  <filterExpression>;
  • projection: as in a single-table select, all the fields from <source> will be available in the projection. In addition to that, the special field <lateralAlias> will be available.

  • source: the source of data. Note: it is not possible to specify a normal join as a source of a lateral join. This limitation will be removed in the future.

  • lateralArrayExpression: any expression that evaluates to an array. Fields <source> are available for defining this expression.

  • filterExpression: a filter expression specifying which records should be filtered.

Single Lateral Joins

Assume you have a topic batched_readings populated with the following records:

batched_readings

_key
meter_id
readings

a

1

[100, 80, 95, 91]

b

2

[87, 93, 100]

c

1

[88, 89, 92, 94]

d

2

[81]

As you can see, readings is a field containing arrays of integers.

We define a processor like this:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL readings AS reading
WHERE 
    reading > 90

The processor will emil the following records:

_key
meter_id
reading

a

1

100

a

1

95

a

1

91

b

2

93

c

1

92

c

1

94

Things to notice:

  • We used the aliased lateral expression reading both in the projection and in the WHERE.

  • The _key for each emitted record is the one of the original record. As usual you can change this behavior projecting on the key with a projection like expression AS _key.

  • batched_readings records with keys a and b have been split into multiple records. That’s because they contain multiple readings greater than 90.

  • Record d disappeared, because it has no readings greater than 90

Multiple Lateral Joins

It is possible to use multiple LATERAL joins in the same FROM clause.

Assume you have a topic batched_nested_readings populated with the following records:

batched_readings

_key
meter_id
nested_readings

a

1

[[100, 80], [95, 91]]

b

2

[[87], [93, 100]]

c

1

[[88, 89], [92, 94]]

d

2

[[81]]

Notice how nested_readings contains arrays of arrays of integers.

To get the same results of the previous example, we use a first lateral join to unpack the first level of nested_readings into an array that we call readings. We then define a second lateral join on readings to extract the single values:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL nested_readings AS readings
    LATERAL readings as reading
WHERE 
    reading > 90

Complex Lateral expressions

In the previous example we used a simple field as the <lateralArrayExpression>. In the section we will see how any array expression can be used for it.

Assume you have a topic day_night_readings populated with the following records:

day_night_readings

_key
meter_id
readings_day
readings_night

a

1

[100, 80]

[95, 91]

b

2

[87, 93]

[100]

c

1

[88]

[89, 92, 94]

d

2

[81]

[]

We can make use of Array Functions to lateral join day_night_readings on the concatenation of the two readings fields:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL flatten([readings_day, readings_night]) AS reading
WHERE 
    reading > 90The processor such defined will emit the records
_key
meter_id
reading

a

1

100

a

1

95

a

1

91

b

2

93

c

1

92

c

1

94

Aggregations

This page describes how to aggregate data in Kafka with Lenses SQL Processors.

Aggregations are stateful transformations that allow to grouping of an unbounded set of inputs into sub-sets and then aggregate each of these sub-sets into a single output; the reason why they are stateful is because they need to maintain the current state of computation between the application of each input.

To group a given input dataset into sub-sets, a key function needs to be specified; the result of applying this key function to an input record will be used as a discriminator (sometimes called a pivot) to determine in what sub-set each input record is to be bucketed.

The specific transformation that each aggregation performs is described by the Aggregated Functions used in the input query.

Aggregations match table semantics

Notice that the behaviour described above is precisely what a Table does. For any given key, there is the state will continuously be updated as new events with the given key are received. In the case of Aggregations, new events are represented by input records in the original dataset that will map to a given key, therefore ending up in a bucket or another.

Whenever Aggregations are used, the result will be a Table. Each entry will have the key set to the grouping discriminator, and the value set to the current state of computation for all input records matching the key.

Syntax

The complete syntax for aggregations is:

SELECT (STREAM | TABLE)
  <aggregated projection1>
    [, aggregated projection2] ... [, aggregated projectionN]
    [, projection1] ... [, projectionM]
FROM
  <source>
[WINDOW BY <window description>]
GROUP BY <expression>
;

The specific syntactical elements of the above are:

  • (STREAM | TABLE): specifies if the <source> is to be interpreted as a stream or a table.

  • <aggregated projection1>: a projection is aggregated when its source contains an Aggregated Function (e.g. COUNT(*) as x, CAST(COUNT(*) as STRING) as stringed).

  • [, aggregated projection2] ... [, aggregated projectionN]: a query can contain any number of additional aggregated projections after the first mandatory one.

  • [, projection1] ... [, projectionM]: a query can contain any number of common, non-aggregated, projections. Streaming only supports full GROUP BY mode. This means that fields that are not part of the GROUP BY clause cannot be referenced by non-aggregated projections.

  • <source>: a normal source, like a topic or the result of a WITH statement.

  • [WINDOW BY <window description>]: this optional section can only be specified if STREAM is used. It allows us to describe the windowing that will be applied to the aggregation.

  • GROUP BY <expression>: the result of evaluating <expression> will be used to divide the input values into different groups. These groups will be the input for each aggregated projection specified. The <expression>’s result will become the key for the table resulting from the query.

Specific rules for aggregated projections

Most of the rules and syntax described for Projections apply to aggregated projections as well, but there are some additional syntactical rules due to the specific nature of Aggregations.

  • Aliasing rules mostly work the same, but it is not possible to project on the key facet; COUNT(*) as _key.a or SUM(x) as _key are therefore not allowed.

  • At least one aggregated projection must be specified in the query.

  • Projections using an unqualified key facet as the source are not allowed. _key.a or COUNT(_key.b) are forbidden because _key is unqualified, but <source>._key.a and COUNT(<source>._key.b) are supported.

Grouping and storage format for key

INSERT INTO target-topic
SELECT STREAM  
    COUNT(*) AS records
FROM input-topic
GROUP BY field1;

As previously mentioned, the GROUP BY is used to determine the key of the query’s result; the above query will group all records in input-topic by the value of field1 in each record, and target-topic’s key will be the schema of field1.

Just like in the case of the Projections, the Streaming mode takes an opinionated approach here and will simplify the result schema and Storage Format in the case of single field structures.

In the case above, assuming for example that field1 is an integer, target-topic’s key will not be a structure with a single integer field1 field, but rather just the value field1; the resulting storage format is going to be INT, and the label field1 will be just dropped.

In case the above behaviour is not desirable, specifying an explicit alias will allow us to override it.

INSERT INTO target-topic
SELECT STREAM  
    COUNT(*) AS records,
FROM input-topic
GROUP BY field1 AS keep_me;

This will result in target-topic’s key is a structure with a field keep_me, with the same schema as field1. The corresponding Storage Format will match the input format for input-topic, AVRO or JSON.

Semantics

An example will help clarify how aggregations work, as well as how they behave depending on the semantics of the input dataset they are being applied to.

Example scenario

Assume that we have a Kafka topic (gaming-sessions) containing these records:

Offset
Key
Value

0

billy

{points: 50, country: “uk”}

1

billy

{points: 90, country: “uk”}

2

willy

{points: 70, country: “uk”}

3

noel

{points: 82, country: “uk”}

4

john

{points: 50, country: “usa”}

5

dave

{points: 30, country: “usa”}

6

billy

{points: 90, country: “spain”}

What this data describes is a series of gaming sessions, performed by a player. For each gaming session, the player (used as Key), the points achieved, and the country where the game took place.

Aggregating a Stream

Let’s now assume that what we want to calculate is the total points achieved by players in a given country, as well as the average points per game. One way to achieve the desired behaviour is to build a Stream from the input topic. Remember that this means that each event will be considered in isolation.

INSERT INTO target-topic
SELECT STREAM
    SUM(points) AS total_points
    , AVG(points) AS average_points
FROM gaming-sessions
GROUP BY country

Explanations for each element of this syntax can be found below, but very briefly, this builds a Stream from gaming-sessions, grouping all events by country (e.g. all records with the same country will be aggregated together) and finally calculating the total (total_points) and the average (average_points) of all points for a given group.

The final result in target-topic will be (disregarding intermediate events):

Offset
Key
Value

3

uk

{total_points: 292, average_points: 73}

5

usa

{total_points: 80, average_points: 40}

6

spain

{total_points: 90, average_points: 90}

The results are calculated from the totality of the input results because in a Stream, each event is independent and unrelated to any other.

Aggregating a Table

We now want to calculate something similar to what we obtained before, but we want to keep track only of the last session played by a player, as it might give us a better snapshot of both the performances and locations of players worldwide. The statistics we want to gather are the same as before: total and average of points per country.

The way to achieve the above requirement is simply by reading gaming-sessions into a Table, rather than a Stream, and aggregating it.

INSERT INTO target-topic
SELECT TABLE
    SUM(points) AS total_points
    , AVG(points) AS average_points
FROM gaming-sessions
GROUP BY country

The final result in target-topic will be (disregarding intermediate events):

Key
Value

uk

{total_points: 152, average_points: 76}

usa

{total_points: 80, average_points: 40}

spain

{total_points: 90, average_points: 90}

Compare this with the behaviour from the previous scenario; the key difference is that the value for uk includes only willy and noel, and that’s because the last event moved billy to the spain bucket, removing all data regarding him from his original group.

Aggregation functions for Tables

The previous section described the behaviour of aggregations when applied to Tables, and highlighted how aggregations not only need to be able to sum the latest values received to the current state of a group but also need to be able to subtract an obsolete value that might have just been assigned to a new group. As we saw above, it is easy to do this in the case of SUM and AVG.

However, consider what would happen if we wanted to add new statistics to the ones calculated above: the maximum points achieved by a player in a given country.

In the Stream scenario, this can be achieved by simply adding MAXK(points,1) as max_points to the query.

INSERT INTO target-topic
SELECT STREAM
    SUM(points) AS total_points
    , AVG(points) AS average_points
    , MAXK(points,1) AS max_points
FROM gaming-sessions
GROUP BY country
Offset
Key
Value

3

uk

{total_points: 292, average_points: 73, max_points: 90}

5

usa

{total_points: 80, average_points: 40, max_points: 50}

6

spain

{total_points: 90, average_points: 90, max_points: 90}

In the Table scenario, however, things are different. We know that the final event moves billy from uk to spain, so we need to subtract from uk all information related to billy. In case of SUM and AVG that’s possible because subtracting billy’s points to the current value of the aggregation will return the correct result.

But that’s not possible for MAXK. MAXK(points, 1) only keeps track of 1 value, the highest seen so far, and if that’s removed, what value should take its place? The aggregation function cannot inspect the entire topic data to search for the correct answer. The state the aggregation function has access to is that single number, which now is invalid.

This problem explains why some aggregated functions can be used on Streams and Tables (e.g. SUM), while others can be used only on Streams (e.g. MAXK).

The key factor is usually whether a hypothetical subtraction operation would need access to all previous inputs to calculate its new value (like MAXK) or just the aggregated state (like SUM).

Windowed aggregations

A common scenario that arises in the context of aggregations is the idea of adding a time dimension to the grouping logic expressed in the query. For example, one might want to group all input records by a given field that were received within 1 hour of each other.

To express the above Lenses SQL Streaming supports windowed aggregations, by adding a WINDOW BY clause to the query. Given their semantics, tables cannot be aggregated using a window, because it would not make sense. A table represents the latest_ state of a set of (Key, Value) pairs, not a series of events interspersed over a time continuum. Thus trying to window them is not a sensible operation.

Filtering aggregated queries

Filtering the input into aggregated queries is similar to filtering non-aggregated ones. When using a WHERE <expression> statement, where <expression> is a valid SQL boolean expression, all records that do not match the predicate will be left out.

However, aggregated functions add a further dimension to what it might be desirable to filter.

We might be interested in filtering based on some conditions of the groups themselves; for example, we might want to count all input records that have a given value offield1, but only if the total is greater than 3. In this case, WHERE would not help, because it has no access to the groups nor to the results of the aggregated projections. The below query is what is needed.

INSERT INTO target-topic
SELECT STREAM
    COUNT(*) as sessions
FROM gaming-sessions
GROUP BY country
HAVING sessions > 3;

The above query uses the HAVING clause to express a filter at a grouping level. Using this feature it is possible to express a predicate on the result of aggregated projections and filter out the output records that do not satisfy it.

Only aggregated projections specified in the SELECT clause can be used within the HAVING clause.

Time & Windows

This page describes the time and windowing of data in Kafka with Lenses SQL Processors.

A data stream is a sequence of events ordered by time. Each entry contains a timestamp component, which aligns it on the time axis.

Kafka provides the source for the data streams, and the Kafka message comes with the timestamp built in. This is used by the Push Engines by default. One thing to consider is that from a time perspective, the stream records can be out of order. Two Kafka records, R1 and R2 do not necessarily respect the rule: R1 timestamp is smaller than R2 timestamp.

Timestamps are required to perform time-dependent operations for streams - like aggregations and joins.

Timestamp semantics

A record timestamp value can have three distinct meanings. Kafka allows to configure a topic timestamp meaning via this log.message.timestamp.type setting. The two supported values are CreateTime and LogAppendTime.

Event time

When a record is created at source the producer is responsible for setting the timestamp for it. Kafka producer provides this automatically and this is aligned with the CreateTime the configuration mentioned earlier.

Ingestion time

At times, the data source timestamp is not available. When setting the topic timestamp type to LogAppendTime, the Kafka broker will attach the timestamp at the moment it writes it to the topic.

Processing time

The timestamp will be set to the time the record was read by the engine, ignoring any previously set timestamps.

Control the timestamp

Sometimes, when the data source is not under direct control, it might be that the record’s timestamp is actually embedded in the payload, either in the key or the value.

Lenses SQL Streaming allows to specify where to extract the timestamp from the record by using EVENTTIME BY.

...
SELECT STREAM ...
FROM input-topic
EVENTTIME BY <selection>
...

where <selection> is a valid selection.

Here are a few examples on how to use the syntax to use the timestamp from the record value facet:

SELECT STREAM *
FROM <source>
EVENTTIME BY startedAt;
...

// this is identical with the above; _value qualifies to the record Value component
SELECT STREAM *
FROM <source>
EVENTTIME BY _value.startedAt;
...

// `details` here is a structure and `startedAt` a nested field
SELECT STREAM *
FROM <source>
EVENTTIME BY details.startedAt;

...

For those scenarios when the timestamp value lives within the record key, the syntax is similar:

SELECT STREAM *
FROM <source>
EVENTTIME BY _key.startedAt;
...

// `details` here is a structure and `startedAt` a nested field
SELECT STREAM *
FROM <source>
EVENTTIME BY _key.details.startedAt;
...

Output timestamp

All records produced by the Lenses SQL Streaming will have a timestamp set and its value will be one of the following:

  1. For direct transformations, where the output record is a straightforward transformation of the input, the input record timestamp will be used.

  2. For aggregations, the timestamp of the latest input record being aggregated will be used.

  3. In all other scenarios, the timestamp at which the output record is generated will be used.

Time windows

Some stream processing operations, like joins or aggregations, require distinct time boundaries which are called windows. For each time window, there is a start and an end, and as a result a duration. Performing aggregations over a time window, means only the records which fall within the time window boundaries are aggregated together. It might happen for the records to be out-of-order and arrive after the window end has passed, but they will be associated with the correct window.

Types

There are three-time windows to be used at the moment: hopping, tumbling and session.

Duration types

When defining a time window size, the following types are available:

Duration
Description
Example

ms

time in milliseconds.

100ms

s

time in seconds.

10s

m

time in minutes.

10m

h

time in hours.

10h

Hopping window

These are fixed-size and overlapping windows. They are characterised by duration and the hop interval. The hop interval specifies how far a window moves forward in time relative to the previous window.

Since the windows can overlap, a record can be associated with more than one window.

Use this syntax to define a hopping window:

WINDOW BY HOP <duration_time>,<hop_interval>
INSERT INTO <target>
SELECT STREAM
    country
  , COUNT(*) AS occurrences
  , MAXK_UNIQUE(points,3) AS maxpoints
  , AVG(points) AS avgpoints
FROM <source>
EVENTTIME BY startedAt
WINDOW BY HOP 5m,1m
GROUP BY country
;
Hopping

Tumbling window

They are a particularisation of hopping windows, where the duration and hop interval are equal. This means that two windows can never overlap, therefore a record can only be associated with one window.

WINDOW BY TUMBLE <duration_time>

Duration time takes the same unit types as described earlier for hopping windows.

INSERT INTO <target>
SELECT STREAM
    country
  , COUNT(*) AS occurrences
  , MAXK_UNIQUE(points,3) AS maxpoints
  , AVG(points) AS avgpoints
FROM <source>
EVENTTIME BY startedAt
WINDOW BY TUMBLE 5m
GROUP BY country
;  
Tumbling

Session window

Unlike the other two window types, this window size is dynamic and driven by the data. Similar to tumbling windows, these are non-overlapping windows.

A session window is defined as a period of activity separated by a specified gap of inactivity. Any records with timestamps that occur within the boundaries of the inactivity interval are considered part of the existing sessions. When a record arrives and its timestamp is outside of the session gap, a new session window is created and the record will belong to that.

A new session window starts if the last record that arrived is further back in time than the specified inactivity gap. Additionally, different session windows might be merged into a single one if an event is received that falls in between two existing windows, and the resulting windows would then overlap.

To define a session window the following syntax should be used:

WINDOW BY SESSION <inactivity_interval>

The inactivity interval can take the time unit type seen earlier for the hopping window.

INSERT INTO <target>
SELECT STREAM
    country
  , COUNT(*) AS occurrences
FROM $source
WINDOW BY SESSION 1m
GROUP BY country

Session windows are tracked on a per-key basis. This means windows for different keys will likely have different durations. Even for the same key, the window duration can vary.

User behaviour analysis is an example of when to use session windows. They allow metrics like counting user visits, customer conversion funnel or event flows.

Session

Late arrival

It is quite common to see records belonging to one window arriving late, that is after the window end time has passed. To accept these records the notion of a grace period is supported. This means that if a record timestamp falls within a window W and it arrives within W + G (where G is the grace interval) then the record will be processed and the aggregations or joins will update. If, however, the record comes after the grace period then it is discarded.

To control the grace interval use this syntax:

...
WINDOW BY HOP 1m,5m GRACE BY 2h

...
WINDOW BY TUMBLE 5m GRACE BY 2h

...
WINDOW BY SESSION 1m, GRACE BY 2h

The default grace period is 24 hours. Until the grace period elapses, the window is not actually closed.

Storage format

This page describes the storage formats of data in Kaka supported by Lenses SQL Processors.

The output storage format depends on the sources. For example, if the incoming data is stored as JSON, then the output will be JSON as well. The same applies when Avro is involved.

When using a custom storage format, the output will be JSON.

At times, it is required to control the resulting Key and/or Value storage. If the input is JSON, for example, the output for the streaming computation can be set to Avro.

Another scenario involves Avro source(-s), and a result which projects the Key as a primitive type. Rather than using the Avro storage format to store the primitive, it might be required to use the actual primitive format.

Syntax

Controlling the storage format can be done using the following syntax:

INSERT INTO <target>
STORE
  KEY AS <format>
  VALUE AS <format>
  ...

There is no requirement to always set both the Key and the Value. Maybe only the Key or maybe only the Value needs to be changed. For example:

 INSERT INTO <target>
 STORE KEY AS <format>
 ...

 //or

 INSERT INTO <target>
 STORE VALUE AS <format>
 ...

Considering a scenario where the input data is stored as Avro, and there is an aggregation on a field which yields an INT, using the primitive INT storage and not the Avro INT storage set the Key format to INT:

INSERT INTO <target>
STORE KEY AS INT
SELECT TABLE
    SUM(amount) AS total
FROM <source>
GROUP BY CAST(merchantId AS int)

Here is an example of the scenario of having Json input source(-s), but an Avro stored output:

INSERT INTO <target>
STORE
  KEY AS AVRO
  VALUE AS AVRO
SELECT STREAM
    _key.cId AS _key.cId
    , CONCAT(_key.name, "!") AS _key.name
    , pId
    , CONCAT("!", name) AS name
    , surname
    , age  
FROM <source>

Validation

Changing the storage format is guarded by a set of rules. The following table describes how storage formats can be converted for the output.

From \ To
INT
LONG
STRING
JSON
AVRO
XML
Custom/Protobuf

INT

=

yes

yes

no

yes

no

no

LONG

no

=

yes

no

yes

no

no

STRING

no

no

=

no

yes

no

no

JSON

If the Json storage contains integer only

If the Json storage contains integer or long only

yes

=

yes

no

no

AVRO

If Avro storage contains integer only

If the Avro storage contains integer or long only

yes

yes

=

no

no

XML

no

no

no

yes

yes

no

no

Custom (includes Protobuf)

no

no

no

yes

yes

no

no

Time/Session window validations

Time windowed formats follow similar rules to the ones described above with the additional constraint that Session Windows(SW) cannot be converted into Time Windows (TW) nor vice-versa.

From \ To
SW[B]
TW[B]

SW[A]

yes if format A is compatible with format B

no

TW[A]

no

yes if format A is compatible with format B

Example: Changing the storage format from TWAvro to TWJson is possible since they’re both TW formats and Avro can be converted to JSON.

Example: Changing the storage format from TWString to TWJson is not possible since, even though they’re both TW formats, String formats can’t be written as JSON.

XML as well as any custom formats are only supported as an input format. Lenses will, by default translate and process these formats by translating them to JSON and writing them as such (AVRO is also supported if a store is explicitly set).

Nullibility

This page describes handling nulls values in Kafka data with Lenses SQL Processors.

Null values are used as a way to express a value that isn’t yet known.

Null values can be found in the data present in existing sources, or they can be the product of joining data using non-inner joins.

The schema of nullable types is represented as a union of the field type and a null value:

{
  "type": "record",
  "name": "record",
  "namespace": "example",
  "doc": "A schema representing a nullable type.",
  "fields": [
    {
      "name": "property",
      "type": [
        "null",
        "double"
      ],
      "doc": "A property that can be null or a double."
    }
  ]
}

Using nullable types

Working with null values can create situations where it’s not clear what the outcome of an operation is.

One example of this would be the following:

null * 1 = ?
null + 1 = ?

Looking at the first two expressions, one may be tempted to solve the problem above by saying “Null is 1 when multiplying and 0 when summing” meaning the following would be the evaluation result:

null * 1 = 1
null + 1 = 1

Rewriting the third expression applying the distributive property of multiplication however, shows that the rule creates inconsistencies:

(null + 1) * null = (null + 1) * null <=>
null * null + 1 * null = (null + 1) * null <=>
1 * 1 + 1*1 = (0 + 1) * 1 <=>
1 + 1 = 1 * 2 <=>
1 = 2  //not valid

With the intent of avoiding scenarios like the above where a computation may have different results based on the evaluation approach taken, most operations in lenses do not allow operations to use nullable types.

Address nullability

Lenses provides the following tools to address nullability in a flexible way:

COALESCE

  • Coalesce: A function that allows specifying a list of fields to be tried until the first non-null value is found.

    • Note: the coalesce function won’t verify if a non-nullable field is provided so an error may still be thrown if all the provided fields are null

    • e.g:COALESCE(nullable_fieldA, nullable_fieldB, 0)

AS_NON_NULLABLE

  • AS_NON_NULLABLE: a function that changes the type of a property from nullable to non-nullable.

    • Note: This function is unsafe and will throw an error if a null value is passed. It should only be used if there’s a guarantee that the value won’t ever be null (for instance if used in a CASE branch where the null case has been previously handled or if the data has previously been filtered and the null values removed).

    • e.g: AS_NON_NULLABLE(nullable_field)

AS_NON_NULLABLE and CASE

  • AS_NON_NULLABLE with CASE: A type-checked construct equivalent to using coalesce:

    • e.g:

CASE
    WHEN a_nullable_field IS NULL THEN 0
    ELSE AS_NON_NULLABLE(a_nullable_field)
END

AS_NULLABLE

The AS_NULLABLE function is the inverse transformation of the AS_NON_NULLABLE version. This function allows a non-nullable field type to be transformed into a nullable type. It can be used to insert data into existing topics where the schema of the target field is nullable.

Settings

This page describes using settings in Lenses SQL Processors to process data in Kafka.

The SET syntax allows customizing the behaviour for the underlying Kafka Consumer/Producer, Kafka Streams (including RocksDB parameters), topic creation and error handling.

The general syntax is:

SET <setting_name>=<setting_value>;

Kafka topics

SQL processors can create topics that are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic-related) to allow distinct setups for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.

To set the defaults follow this syntax:

SET defaults.topic.<topic_setting_key> = <value>;
Key
Type
Description

autocreate

BOOLEAN

Creates the topic if it does not exist already.

partitions

INT

Controls the target topic partitions count. If the topic already exists, this will not be applied.

replication

INT

Controls the topic replication factor. If the topic already exists, this will not be applied.

-

Each Kafka topics allows a set of parameters to be set. For example cleanup.policy can be set like this SET defaults.topic.cleanup.policy='compact,delete';

key.avro.record

STRING

Controls the output record Key schema name.

key.avro.namespace

STRING

Controls the output record Key schema namespace.

value.avro.record

STRING

Controls the output record Key schema name.

value.avro.namespace

STRING

Controls the output record Key schema namespace.

All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:

SET topic.<topic_name>.<topic_setting_key>=<value>;
SET topic.market_risk.cleanup.policy='compact,delete';

--escaping the topic name if it contains . or - or other non-alpha numeric
SET topic.`market.risk`.cleanup.policy='compact,delete';
SET topic.`market-risk`.cleanup.policy='compact,delete';

Error handling

The streaming engine allows users to define how errors are handled when writing to or reading from a topic.

Both sides can be set at once by doing:

SET error.policy= '<error_policy>';

or individually as described in the sections below.

Reading Errors

Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:

SET error.policy.read= '<error_policy>';

Writing Errors

While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:

SET error.policy.write= '<error_policy>';

There are three possible values to control the behaviour.

Value
Description

continue

Allows the application to carry on. The problem will be logged.

fail

Stops the application. The application will be in a failed (error) state.

dlq

Allows the application to continue but it will send the payload to a dead-letter-topic. It requires dead.letter.queue to be set. The default value for dead.letter.queue is lenses.sql.dlq.

When dlq is used this setting is required. The value is the target topic where the problematic records will be sent to.

SET dead.letter.queue = '<dead_letter_topic>';

Kafka Streams Consumer and Producer settings

Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.

SET <setting_key>=<value>;

\

Key
Type
Description

processing.guarantee

STRING

The processing guarantee that should be used. Possible values are AT_LEAST_ONCE (default) and EXACTLY_ONCE. Exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production.

commit.interval.ms

LONG

The frequency with which to save the position of the processor. If processing.guarantee is set to EXACTLY_ONCE, the default value is 100, otherwise the default value is 30000. This setting directly impacts the behavior of Tables, as it controls how often they will emit events downstream. An event will be emitted only every commit.interval.ms, so every intermediate event that is received by the table will not be visible downstream directly.

poll.ms

LONG

The amount of time in milliseconds to block waiting for input.

cache.max.bytes.buffering

LONG

Maximum number of memory bytes to be used for buffering across all threads. It has to be at least 0. Default value is: 10 * 1024 * 1024.

client.id

STRING

An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘<client.d>-StreamThread--<consumer

num.standby.replicas

INT

The number of standby replicas for each task. Default value is 0.

num.stream.threads

INT

The number of threads to execute stream processing. Default values is 1.

max.task.idle.ms

LONG

Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.

buffered.records.per.partition

INT

Maximum number of records to buffer per partition. Default is 1000.

buffered.records.per.partition

INT

Maximum number of records to buffer per partition. Default is 1000.

connections.max.idle.ms

LONG

Close idle connections after the number of milliseconds specified by this config.

receive.buffer.bytes

LONG

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.

reconnect.backoff.ms

LONG

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

reconnect.backoff.max.ms

LONG

The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000.

retries

INT

Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. Default is 0

retry.backoff.ms

LONG

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100.

send.buffer.bytes

LONG

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Default is 128 * 1024.

state.cleanup.delay.ms

LONG

The amount of time in milliseconds to wait before deleting state when a partition has migrated.

Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.

SET session.timeout.ms=120000;
SET max.poll.record = 20000;

Some of the configurations for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.

SET consumer.<duplicate_config_key>=<value_1>;
SET producer.<duplicate_config_key>=<value_2>;

RocksDB

Stateful data flow applications might require, on rare occasions, some of the parameters for the underlying RocksDB to be tweaked.

To set the properties, use:

SET rocksdb.<key> = <value>;
Key
Type
Description

rocksdb.table.block.cache.size

LONG

Set the amount of cache in bytes that will be used by RocksDB. If cacheSize is non-positive, then cache will not be used. DEFAULT: 8M

rocksdb.table.block.size

LONG

Approximate size of user data packed per lock. Default: 4K

rocksdb.table.block.cache.compressed.num.shard.bits

INT

Controls the number of shards for the block compressed cache

rocksdb.table.block.cache.num.shard.bits

INT

Controls the number of shards for the block cache

rocksdb.table.block.cache.compressed.size

LONG

Size of compressed block cache. If 0,then block_cache_compressed is set to null

rocksdb.table.block.restart.interval

INT

Set block restart interval

rocksdb.table.block.cache.size.and.filter

BOOL

Indicating if we’d put index/filter blocks to the block cache. If not specified, each ’table reader’ object will pre-load index/filter block during table initialization

rocksdb.table.block.checksum.type

STRING

Sets the checksum type to be used with this table. Available values: kNoChecksum, kCRC32c, kxxHash.

rocksdb.table.block.hash.allow.collision

BOOL

Influence the behavior when kHashSearch is used. If false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision(less memory consumption)

rocksdb.table.block.index.type

STRING

Sets the index type to used with this table. Available values: kBinarySearch, kHashSearch

rocksdb.table.block.no.cache

BOOL

Disable block cache. If this is set to true, then no block cache should be used. Default: false

rocksdb.table.block.whole.key.filtering

BOOL

If true, place whole keys in the filter (not just prefixes).This must generally be true for gets to be efficient. Default: true

rocksdb.table.block.pinl0.filter

BOOL

Indicating if we’d like to pin L0 index/filter blocks to the block cache. If not specified, defaults to false.

rocksdb.total.threads

INT

The max threads RocksDB should use

rocksdb.write.buffer.size

LONG

Sets the number of bytes the database will build up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk file

rocksdb.table.block.size.deviation

INT

This is used to close a block before it reaches the configured ‘block_size’. If the percentage of free space in the current block is less than this specified number and adding a new record to the block will exceed the configured block size, then this block will be closed and thenew record will be written to the next block. Default is 10.

rocksdb.compaction.style

STRING

Available values: LEVEL, UNIVERSAL, FIFO

rocksdb.max.write.buffer

INT

rocksdb.base.background.compaction

INT

rocksdb.background.compaction.max

INT

rocksdb.subcompaction.max

INT

rocksdb.background.flushes.max

INT

rocksdb.log.file.max

LONG

rocksdb.log.fle.roll.time

LONG

rocksdb.compaction.auto

BOOL

rocksdb.compaction.level.max

INT

rocksdb.files.opened.max

INT

rocksdb.wal.ttl

LONG

rocksdb.wal.size.limit

LONG

rocksdb.memtable.concurrent.write

BOOL

rocksdb.os.buffer

BOOL

rocksdb.data.sync

BOOL

rocksdb.fsync

BOOL

rocksdb.log.dir

STRING

rocksdb.wal.dir

STRING

External Applications

This page describes external applications in Lenses.

External applications are your custom applications that are deployed outside of Lenses, for example a Java Kafka Streams or Python Application.

You can register these into ways:

Cover

Java SDK

Cover

HTTP Rest Call

Registering via SDK

This page describes using the Lenses Java SDK to register your own applications with Lenses.

A Java SDK is available, for example, to add Kafka Streams, Akka Streams, and Spark Structured Streaming. You can find full working examples on GitHub.

Registering via REST

This page describes using the Lenses Rest endpoints to register your own applications with Lenses.

Each app can be registered with a set of endpoints that return the status of each app’s running instances. These endpoints are called runners and allow Lenses to get the running instances’ status by pinging these endpoints. The app state is consolidated based on the individual runner’s state, which is periodically checked.

Each runner can be only:

  • RUNNING - When the health-check endpoints return 200 HTTP Status Code

  • UNKNOWN - When the health-check endpoints return anything other than 200 HTTP Status Code

Lenses consolidates the individual runners’ statuses into a single app state:

  • RUNNING - All Runners are RUNNING

  • WARNING - At least 1 Runner is UNKWOWN

  • UNKNOWN - No information about Runners, either they are unreachable

Registering a new application

To add your external apps to Lenses, you can currently use the API by doing a POST to api/v1/apps/external endpoint.

curl -X POST \
  <LENSES_HOST_URL>/api/v1/apps/external \
  -H 'Content-Type: application/json' \
  -H 'X-Kafka-Lenses-Token: <LENSES_AUTH_TOKEN>' \
  -d '{
    "name": "Example_Producer_App",
    "metadata": {
        "version": "1.0.0",
        "owner": "Lenses",
        "deployment": "K8s",
        "tags": [
            "fraud",
            "detection",
            "app"
        ]
    },
    "input": [{"name": "fraud_input_topic"}],
    "output": [{"name": "fraud_output_topic"}],
    "runners": [{"url": "<YOUR_HEALTH_CHECK_URL_1>", "name": "Example_Runner"}]
}'

Viewing your application in the Lenses

Apps are another type of built-in application. Go to Workspace->Apps to view your application.

In the application details page, you can see additional information about each application, and also information about each runner, along with metadata information, such as Description, and auditing information such as Created At, Created By and Modified At, Modified By.

Scaling your application runners

You can add runners with a POST request to api/v1/apps/external. The body of the new request should be the same as the first one except for the runners key. What needs to be changed is only the array of the runners key.

curl -X POST \
  <LENSES_HOST_URL>/api/v1/apps/external \
  -H 'Content-Type: application/json' \
  -H 'X-Kafka-Lenses-Token: <LENSES_AUTH_TOKEN>' \
  -d '{
    "name": "Example_Producer_App",
    "metadata": {
        "version": "1.0.0",
        "owner": "Lenses",
        "deployment": "K8s",
        "tags": [
            "fraud",
            "detection",
            "app"
        ]
    },
    "input": [{"name": "fraud_input_topic"}],
    "output": [{"name": "fraud_output_topic"}],
    "runners": [{"url": "<YOUR_HEALTH_CHECK_URL_2>", "name": "Example_Runner"}]
}'

You can also remove runners with a DELETE request.

curl -X DELETE <LENSES_HOST_URL>/api/v1/apps/external/Example_Producer_App/runners \
-H 'Content-Type: application/json' \
-H 'X-Kafka-Lenses-Token: <LENSES_AUTH_TOKEN>' \
-d '{ "runners":["<YOUR_HEALTH_CHECK_URL_1>", "<YOUR_HEALTH_CHECK_URL_2>"]}'

Remove your Application

You can remove the app by making a DELETE request to /api/v1/apps/external/{name}. By Removing an App from Lenses, you just drop Lenses’ visibility for that specific App.

curl -X DELETE <LENSES_HOST_URL>/api/v1/apps/external/Example_Producer_App \
-H 'Content-Type: application/json' \
-H 'X-Kafka-Lenses-Token: <LENSES_AUTH_TOKEN>'

Alternatively, you can use the Lenses UI to remove the App.