Lenses SQL Engine

LSQL is a powerful SQL engine for Apache Kafka allowing people familiar with SQL to use Kafka at ease. It uses Apache Calcite industry standard SQL parser, and supports both batch and streaming queries. With the former, you can run ad-hoc SELECT queries to get an insight on the data already present in your Kafka topics. The latter gives you an intuitive syntax to aggregate, join and transform Kafka streams. You do not need to be a Kafka Streams expert to take full advantage of its capabilities.

The SQL engine supports Json and Avro payloads and comes with built-in support for nested fields selection, proper decimal handling for Avro, string functions, arithmetic support for integer/long/double/float/decimal and furthermore it allows you to address fields in both the key and value part of a Kafka message. For those not familiar with Apache Kafka, each message is made of a key and a value.

Why LSQL for Apache Kafka

The interest and usage of Apache Kafka are continuously growing. Many use cases involve moving data and applying transformations, aggregations, and look-ups on route to a target system such as a data store. The adoption, however, was hindered by users lack of experience with Java. Lenses SQL was the first SQL engine for Apache Kafka, that has enabled Data scientists, developers, business analysts to build robust solutions leveraging Kafka Streams and a few lines of SQL.

Traditional BI departments, that have been dealing with big and fast flowing data streams for many years, are well versed in running SQL on data pipelines. Often these teams are working on projects that involve migration of legacy systems to the cloud. They have the business knowledge and SQL technical skills. LSQL enables them to quickly start using Kafka and the wider streaming ecosystem around it.

Leveraging LSQL, you can focus on the business problems to be solved and you end up spending less time on frameworks, tools and infrastructure.

Topics and Messages

There is so much information available for Apache Kafka on the internet that going into a lot of detail will take the focus away from the subject. But, we need to build a bit of context to fully understand why and how LSQL will increase productivity.

Kafka is a streaming platform using messaging semantics. A Kafka cluster manages a collection of streams named topics. A stream of data is divided horizontally into partitions. The partitions are replicated and distributed through the Kafka cluster to achieve high availability.

A Kafka record (message) is composed of:

  • key
  • value
  • metadata
  • time-stamp
  • topic
  • partition
  • offset

As a result, we allow the user to select any of them within LSQL. If you want to select a field from the key part the syntax is _key.id or even _key.* to use the entire key value. A detail explanation is provided in the next chapter.

To extract maximum performance, Apache Kafka is not aware of the actual message content. All it knows is the message is made of two byte arrays. The advised approach is to use Apache Avro to handle the payload serialization step. By using Avro, the payload schema is enforced and furthermore, schema evolution is provided out of the box (there is a requirement for Schema management and Confluent Schema Registry tends to be the de-facto tool). However, we have seen a lot of software systems using Apache Kafka relying on JSON(JavaScript Object Notation) or Google Protobuf for the serialization.

Important

Apache Kafka does not know about the content of a message!

LSQL knows how to handle the following payload structures (also known as decoder types):

  • BYTES
  • JSON
  • AVRO
  • STRING/WSTRING
  • INT/WINT
  • LONG/WLONG
  • PROTOBUF* (coming soon)

When using LSQL the user has to provide the decoders type. The SQL engine supports specific keywords for this: _ktype and _vtype. The W… decoders are for Windowed keys. When aggregating based on a key and over the time window the resulting Kafka message key contains the actual key (INT/LONG/STRING) plus the timestamp (epoch time).

Syntax

Lenses SQL uses MySQL syntax for dual purposes:

  • read data from a topic [and partition/-s] while applying the required filters and functions on fields.
  • build and run an Apache Kafka Streams application by using a simple SQL like syntax.
Name Description Optional Type
_key

Is used as a prefix and instructs LSQL to pick up the field from the Kafka message key part.
YES  
_ktype

Specifies what decoder to use for the key component when reading from the given topic.
See above for the decode types list.
NO STRING
_vtype

Specifies what decoder to use for the value component when reading from the given topic.
See above for the decode types list.
NO STRING
_partition

Specifies which partitions to read from. It can be _partition=2 or _partition in (1,2,3).
If used in a SELECT field list it returns the Kafka message partition number.
YES INT
_offset

Specifies which offset to read from. It can be bounded by a specific range or a specific value.
If _offset=123 one message is read. The user can restrict further by specifying lower/upper
bound.``_offset >= 123 AND _offset < 999``. If used in a select gives back the Kafka offset.
YES INT
_ts

Restricts the messages read based on their timestamp. Similar to offset it can be bounded
by a specific range or a specific value _ts=1543366 or a range _ts > 123 AND _ts < 125.
Please consult Apache Kafka documentation for timestamp information. If used in a SELECT
statement it returns the Kafka message timestamp.
YES LONG

Keywords

The SQL grammar has predefined keywords:

  • timestamp
  • partition
  • topic
  • date

These keywords can still be used as fields, but they must be escaped. It is recommended as with most SQL dialects to escape field and topic names. Below is an example of such query:

SELECT
  `partition`
  , `timestamp`
FROM `target_topic`

Important

Always escape the topic name via `. If you don’t and the topic contains non-alphanumeric characters the parsing will fail.

Browsing Topics

SELECT Syntax

Retrieves records from a Kafka topic and enables the selection of one or many columns from each message on the target topic. The full syntax of the SELECT statement is described below:

[ SET `max.bytes` = 1000000;]
[ SET `max.time` = 5000;]
[ SET `max.zero.polls` = 5;]
[ SET `{any.kafka.consumer.setting}`= `value`;]
SELECT select_expr [, select_expr ...]
FROM `Topic_Reference`
WHERE _ktype = KEY_TYPE
AND _vtype = VALUE_TYPE
[AND FILTERS]
[LIMIT N]

Aliasing is not allowed in this scenario. Since there is only one source topic, all the select_expr are automatically qualified to that topic.

Control execution

To control the resources involved and the time to execute the query, LSQL engine allows to set:

Name Description Usage
max.bytes
The maximum amount of data to return
Default is 20MB
SET `max.bytes` = 20000000;
will set a max of 20MB to be returned
max.time
The maximum amount of time the query
is allowed to run in msec.
Default is 1 hour
SET `max.time` = 60000;
sets a one minute query limit
max.zero.polls
The maximum number of Kafka Consumer
poll operations returning zero records before
the query is completed.
Default is 8
SET `max.zero.polls` = 5;
sets a maximum of 5 calls returning 0.
LIMIT N
The maximum of records to return.
Default is 10000
SELECT *
FROM payments
WHERE _ktype = BYTES
AND _vtype = AVRO
LIMIT 100;

A real scenario, while browsing the data, is for the query to reach the end of the topic and none of the query boundaries to have been reached. In this scenario, the query will continue running until one of the limits is reached.

The max.zero.polls covers scenarios where all the data in the topic has been processed and none of the limits have been reached. This setting will short-circuit, for example, a query running for 1 hour, if for 8 (default) consecutive Kafka polls, there are no new records.

Sample scenario: Max time is set to 5 minutes and all the records have been processed in 30 seconds; then there is no reason to wait 4 minutes and 30 seconds before the query is completed.

Tune the underlying consumer

On very rare occasions, when browsing the data, there might be a requirement to tune the underlying consumer parameters. LSQL allows setting the consumer properties. For example, to set the consumer fetch.max.wait.ms is as simple as:

SET `fetch.max.wait.ms`= `250`;
SELECT *
FROM `payments`
WHERE _ktype = JSON
AND _vtype = AVRO
LIMIT 10

Value Fields

The previous section has given a quick overview of what a Kafka message is composed of. To select fields within the value part of a message, the user has to just use the field name. For example, let’s assume the following message structure flows through your Kafka topic.

{
  "user": "a user id",
  "region" : "Europe",
  "url" : "someurl",
  "timestamp": 1503256766
}

To select the field named region from the message it is as simple as writing the query below:

SELECT region
FROM `web-traffic`
WHERE _ktype = BYTES
AND _vtype = AVRO

Notice the query specifies the format type for both key and value components of a Kafka message. The result of such a query will be a new Avro record containing only one field: region.

If the format of the source topic is JSON the result would be a Json containing one field.

Important

When a JSON decoder type is used, it is expected the payload is the bytes representation of a Json object.

Not all the payloads stored in a Kafka message have a linear structure. Nested structures are quite a common scenario and LSQL has full support for selecting nested fields.

Consider the message structure below for the message value component:

{
  "name": "Rick",
  "address": {
    "street": {
      "name": "Rock St"
    },
    "street2": {
      "name": "Sunset Boulevard"
    },
    "city": "MtV",
    "state": "CA",
    "zip": "94041",
    "country": "USA"
  }
}

We would want to flatten the structure. To do so, the following query can be used:

SELECT
    name
    , address.street.name AS address1
    , address.street2.name AS address
    , city
    , state
    , zip
    , country
FROM `target_topic`
WHERE _ktype = BYTES
AND _vtype = AVRO

The result of such query statement will be a new Avro record containing seven fields: name, address1, address2, city, state, zip and country. The same applies in case of a Json payload:

SELECT
    name
    , address.street.name AS address1
    , address.street2.name AS address2
    , city
    , state
    , zip
    , country
FROM `target_topic`
WHERE _ktype = BYTES
AND _vtype = JSON

Key Fields

Many times, especially in IoT systems, device information is in the message key and it is not replicated in the message value part. LSQL allows selecting the entire key (for primitive type: Int/Long/Double/String) or a specific field in case of Avro/Json structures. To do that all it is required is to use the _key prefix.

Imagine a scenario where various metrics are obtained from an electrical device and the message key part structure looks like this:

{
    "deviceId": 192211,
    "model":"AX14c1"
}

To select the device id, the following query can be written:

SELECT
    _key.deviceId
    , meterValue
    , ...
FROM `metrics`
WHERE _ktype = BYTES
AND _vtype = AVRO

Note

Prefix the field with _key if you want to select a field which is part of the key. Do not prefix with value/_value the fields you want to select from the Kafka message value part!.

Selecting Special Fields

Sometimes, a Kafka message payload will contain fields which are special keywords in the grammar (they might be function names or Apache Calcite specific grammar tokens) or they might start with a non-alphanumeric character. Selecting them as they are will lead to errors. For example:

SELECT
    system.name
    , @timestamp
    ,...
FROM `metrics`
WHERE _ktype = BYTES
AND _vtype = JSON

To get the above working the fields need to be escaped. LSQL is based on MySQL syntax, therefore the escaping character is `. The syntax correctly handling the two fields in:

SELECT
    `system`.name
    , `@timestamp`
    ,...
FROM `metrics`
WHERE _ktype = BYTES
AND _vtype = JSON

Functions

LSQL comes with support for string and arithmetic and logical functions. The table below contains the list of supported functions.

Name Description Usage
ANONYMIZE
Obfuscates the data, in order to protect it. If the anonymized
value is not provided it will use * for String and 0 for Numbers
anonymize(expression [,Anonymized value])
CAST
Enables conversion of values from one data type to another
cast(dt as int)
COALESCE
Returns the first non-null expression in the expression list.
You must specify two expressions
coalesce(value, prevValue)
CONCATENATE
Returns the string representation of concatenating each expression
in the list. Null fields are left out
concatenate(weather.main, 'fixed', dt,temp.eve)
LEN
Returns the length of a string. LEN calculates length
using characters as defined by UTF-16
len(name)
POSITION Return the position of the first occurrence of substring position('math' in fieldA)
REPLACE
Returns string with every occurrence of search_string
replaced with replacement_string
replace(fieldA, 'math', 'mathematics')
SUBSTRING Return a portion of the string, beginning at the given position substring(field, 2)
TRIM Removes leading and trailing spaces trim(field)
LOWER Returns the argument in lowercase lower(name)
UPPER Returns the argument in uppercase upper(name)
ABS
Returns the absolute value of an expression
it has to evaluate to a number type
abs(field1 + field2)
CEIL
Returns the absolute value of an expression
it has to evaluate to a number type
ceil(field1)
FLOOR Return the largest value not greater than the argument floor(field1)
‘/’
Divides one number by another (an arithmetic operator)
Each expression has to evaluate to a number
a / b
‘-‘
Subtracts one number from another (an arithmetic operator)
Each expression has to evaluate to a number
a - b
‘*’
Multiplies one number from another (an arithmetic operator)
Each expression has to evaluate to a number
a * b
‘+’
Adds one number to another (an arithmetic operator)
Each expression has to evaluate to a number
a + b
‘-‘ (negative)
Returns the negative of the value of a numeric expression
(a unary operator). The expression has to evaluate to a number
a * b
POW
Returns expression1 raised to the expression2 power.
Each expression has to evaluate to a number
pow(a, b)
SQRT
Returns the square root of expression.
The expression has to evaluate to a number
sqrt(a)
COUNT
Returns the number of records read
ONLY FOR STREAMING when grouping is involved
SELECT count(*)
MAX
Returns maximum value of expression.
ONLY FOR STREAMING when grouping is involved
max(field1)
MIN
Returns minimum value of expression.
ONLY FOR STREAMING when grouping is involved
min(field1)
SUM
Returns the sum of expression for each record read.
ONLY FOR STREAMING when grouping is involved
sum(field1)
IF
Evaluates the boolean result for condition. If true it will
return the value of expr1; otherwise, it evaluates and
returns the value of expr2
if(condition, expr1,expr2)
LPAD
Prepends the value of padExpr to the value of strExpr
until the total length is lengthExpr
LPAD(strExpr, lengthExpr, padExpr)
RPAD
Appends the value of padExpr to the value of strExpr
until the total length is lengthExpr
LPAD(strExpr, lengthExpr, padExpr)
EPOCH_TO_DATETIME
Converts an epoch into a datetime yyyy-MM-dd'T'HH:mm:ss.SSSZ
strExpr needs to be a LONG value containing the
milliseconds since 1 January 1970 00:00:00)
EPOCH_TO_DATETIME(strExpr)
EPOCH_TO_DATE
Converts an epoch into a date yyyy-MM-dd
The strExpr needs to resolve to INT/LONG value. If the
value is an INT it is expected to be the day represented
as days since epoch
EPOCH_TO_DATE(strExpr)
DATETIME_TO_EPOCH
Converts a string representation of a datetime into epoch value.
For Avro records the new field will be of type Long with the
Timestamp logical type attached and the epoch value.
For Json records the epoch will be expressed as LONG.
The first parameter strExpr needs to be a STRING value.
The second parameter pattern must be a DateTime Format
DATETIME_TO_EPOCH(strExpr, pattern)
DATE_TO_EPOCH
Converts a string representation of a date into epoch value.
For Avro records, this will emit a field with a logical type of
Date. This means the output value is an INT and it represents
the days since Epoch. For Json records the resulted value is the
epoch expressed as LONG. The first parameter strExpr needs to
resolve to a STRING value. The second parameter pattern,
must be a Date Format
DATE_TO_EPOCH(strExpr, pattern)
CONVERT_TIME
Converts the string format of a date [and time]. The first
parameter strExpr needs be a STRING value. The second
parameter fromPattern is the incoming date[and time] format.
The last parameter represents the target date[and time] format.
CONVERT_TIME(strExpr, fromPattern, toPattern)
CURDATE
CURRENT_DATE
Provides the current date value. For Json records the result
is a human readable date value represented as yyyy-MM-dd.
For Avro records the resulting field will be of type INT and
will have the logical type DATE attached
CURDATE()/CURRENT_DATE()
CURRENT_DATETIME
NOW
CURTIME
Provides the current date and time. For Json records the
result is a human readable date and time value represented
as yyyy-MM-dd'T'HH:mm:ss.SSSZ. For Avro records the resulting
field will be of type LONG and will have the
the logical type TIMESTAMP attached
NOW()/CURTIME()/CURRENT_DATETIME()

Where Clause

Querying for data and filtering out records based on various criteria is a common scenario, hence LSQL has support for complex filter expression.

String Filter

If your field is of type string, any of the operators below can be used as part of the predicate describing which records to select and which ones to drop.

Operation Description
!=

Checks inequality. When you want to filter all records with field != 'ABC'
= Checks for equality. When you want to filter all records with field == 'ABC'
< Checks for less than. When you want to filter all records with field < 'ABC'
<= Checks for less than or equal. When you want to filter all records with field <= 'ABC'
> Checks for greater than. When you want to filter all records with field > 'ABC'
>= Checks for greater than or equal. When you want to filter all records with field >= 'ABC'
like

Checks for prefix/contains/postfix. When you want to filter all records with the field containing the text:
field like '%ABC%'. To check for prefix field like 'ABC%'. To check for postfix field like '%ABC'.
Comparison is case insensitive!.

Let’s assume you have an audit topic where you store changes made to data in your system and you want to see all the changes made by people whose name contains Alex. To achieve that the following query can be used;

SELECT *
FROM audit
WHERE username LIKE '%Alex%'
AND _ktype = BYTES
AND _vtype = AVRO

Null Check

Some of the fields in the payload can be null (or missing in case of Json). Using the operations below a query can filter out any records where a field is null or not null:

Operation Description
is null Checks the value is null. When you want to filter all records with field is null
is not null Checks the value is not null. When you want to filter all records with field is not null
SELECT *
FROM `visitorTracking`
WHERE _ktype = BYTES
AND _vtype = JSON
AND location.country IS NOT NULL

SELECT *
FROM `visitorTracking`
WHERE _ktype = BYTES
AND _vtype = JSON
AND location.country IS NULL

Filter on Numbers

LSQL allows you to apply filters for fields of number type: integer, short, byte, long, double, float, decimal. The equality comparison for float and double happens within a precision range. Java uses a subset of IEEE 754 standard to represent floating point numbers. Some floating point numbers, for example, 1/3, cannot be represented exactly using float and double in Java. As a result, equality needs to consider an epsilon. LSQL uses 0.000001 for the value of the epsilon. If the two operands differ by less than the epsilon they are considered to be equal.

Operation Description
!= Checks for not equal. When you want to filter all records with field != 2
= Checks for equality. When you want to filter all records with field == 'ABC'
< Checks for less than. When you want to filter all records with field < 123.4
<= Checks for less than or equal. When you want to filter all records with field <= 1000
> Checks for greater than. When you want to filter all record with field > -10
>= Checks for greater than or equal. When you want to filter all records with field >= 122.122315
SELECT *
FROM topic
WHERE _ktype = BYTES
AND _vtype = AVRO
AND location.latitude < -10

Filter on Partition

Sometimes a user will want to be able to look at one partition or a subset of partitions when applying a SELECT statement. Then the query needs to use the _partition keyword to instruct which ones. For example:

SELECT
    score
    , userId
    , sessionToken
FROM livescore
WHERE _ktype = BYTES
AND _vtype = JSON
AND _partition = 1

--or

SELECT
    score
    , userId
    , sessionToken
FROM livescore
WHERE _ktype = BYTES
AND _vtype = JSON
AND _partition in (2,3)

Filter on Timestamp

LSQL allows for filtering by timestamp to enable selection of time windows you are interested in. For allowing such filtering LSQL uses the _ts filter.

Let’s say we want to see all the messages on a topic and partition where their timestamp is greater than 1501593142000L - epoch time.

SELECT
    tradeId
    , isin
    , user
FROM `systemxtrades`
WHERE _ktype = BYTES
AND _vtype = AVRO
AND _partition = 1
AND _ts > 1501593142000

This is the same as if you were writing:

SELECT
    tradeId
    , isin
    , user
FROM `systemxtrades`
WHERE _ktype = BYTES
AND _vtype = AVRO
AND _partition = 1
AND _ts > '2017-08-01 13:12:22'

Important

LSQL understands timestamp date format following this rule: yyyy-MM-dd HH:mm:ss[.SSS]

Boolean filter

There are scenarios where checking on a boolean field value is a requirement. The SQL allows you to do so by simply specifying: is true or is false. Let’s say we have a topic where credit cards metadata is stored and we want to only pick those ones which have been blocked. Running the query below will allow you to achieve such functionality:

SELECT *
FROM credit_cards
WHERE _ktype=AVRO AND _vtype=AVRO AND blocked is true

IF Function

Sometimes it is required to pick a value based on a certain condition being met. The IF functions supports this scenario. Below is an example of using it:

SELECT IF((field1 + field2)/2 > 10 AND field3 IS NULL, field3 *10, field4+field6 * field8
FROM some_topic
WHERE _ktype=AVRO AND _vtype=AVRO AND blocked is true

Anonymize your data

At times you might not want the data published to a topic to contain sensitive information. Therefore when a SQL Processor is created you might need to anonymize some of the data. The grammar supports such a construct via the ANONYMIZE function.

SELECT anonymize(username), _key as card_number, lastTransactionTimestamp
FROM credit_cards
WHERE _ktype=AVRO AND _vtype=AVRO

Complex Filter

Applying a filter on a field sometimes is not enough. A user might want to apply algebra on a set of fields and filter based on the result, or maybe a user wants to look at a subset of a string field. LSQL allows you do do that.

SELECT *
FROM topicA
WHERE _ktype = BYTES
AND _vtype = AVRO
AND (a.d.e + b) /c > 100

LSQL allows the user to combine in the where clause fields from both key and value part. Here is an example of doing so:

SELECT *
FROM topicA
WHERE _ktype = AVRO
AND _vtype = AVRO
AND (_key.a.d.e + b) /_key.c < 100


SELECT *
FROM topicA
WHERE _ktype = BYTES
AND _vtype = AVRO
AND ((abs(fieldA) < 100 AND fieldB >= 2) OR (fieldC like '%wow'))

Track the usage

Using the configuration file, Lenses can be set to stores any LSQL query made for browsing. Alongside the query details, the information pushed to the topic also contains the user and the time it was executed.

Set the configuration entry lenses.topics.lsql.storage=_kafka_lenses_lsql_storage and it activates the functionality. Then using the topic viewer screen the storage topic can be queried to see what are the most queries run.

Stream Processing

Apache Kafka ecosystem is enhanced by the Kafka Streams API which allows for data streams processing. The framework is available for the Java runtime, therefore you need to know you Java, Kotlin, Scala, etc. to make use of it. Not anymore. A Kafka Streams flow can be described by anyone with basic SQL skills.

Streaming Syntax

LSQL second usage is defining Kafka Streams flows through a SQL-like syntax. All the information learned so far is fully applicable to all the SELECT statements written for stream processing. The syntax, which might look complex initially, looks like this:

[ SET autocreate = true;]
[ SET partitions = 1;]
[ SET replication = 2;]
[ SET `decimal.scale`= 18;]
[ SET `decimal.precision`= 38;]
[ SET `ANY KAFKA STREAMS CONFIG. See Kafka documentation StreamsConfig, ConsumerConfig and ProducerConfig` = '';]
[ SET `topic.[ANY KAFKA Log CONFIG. See LogConfig]` = '';]
[ SET `rocksdb.[RocksDB specific configurations. See the section below on RocksDB]`= '';]

INSERT INTO _TARGET_TOPIC_
[WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]
SELECT select_expr [, select_expr ...]
FROM _ID_NAME_ INNER JOIN _OTHER_ID_NAME_ ON join_expr
[WHERE condition_expr]
[GROUP BY group_by_expr]
[HAVING having_expr]

If you are not familiar with Apache Kafka stream processing API please follow the documentation.

Important

Streaming SQL is not your typical RDBMS SQL. Core concepts around stream processing with Apache Kafka, the duality of Table/Stream, the implication of creating a Table versus a Stream instance, etc. need to be understood first.

Using LSQL for streaming allows you to do:

  • Transformations
  • Aggregation
  • Join data

We will go through each one in detail but before we do so we need to expand on the syntax you have seen earlier.

Important

When using Avro payloads the schema needs to be present. LSQL engine does static validation against the existing schema. If the schema is missing an error will be returned.

Windowing

Windowing allows you to control how to group records which share the same key for stateful operations such as aggregations or join windows. Windows are tracked per record key. LSQL has support for the full spectrum of windowing functionality available in the Kafka Streams API.

Note

A record is discarded and will not be processed by the window if it arrives after the retention period has passed.

You can use the following types of windows in LSQL:

  • Hopping time windows. These are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.
...
GROUP BY HOP(5,m,1,m)
...
  • Tumbling time windows. These are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.
...
GROUP BY tumble(1,m)
...
  • Sliding windows. These express fixed-size window that slides continuously over the time axis. Here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps.
...
GROUP BY SLIDING(1,m)
...
  • Session windows. These are used to aggregate key-based events into sessions. Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap, then a new session will be created. Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes). As such session windows can’t be pre-computed and are instead derived from analyzing the timestamps of the data records.
...
GROUP BY SESSION(10,m, 5, m)
...

All the window functions allow the user to specify the time unit. Supported time windows are

Keyword Unit
MS milliseconds
S seconds
M minutes
H hours

Table or Stream

When using Apache Kafka Streams API you can build a KStream or KTable from a topic. To distinguish between them, LSQL uses the keyword STREAM. When the keyword is missing an instance of KTable is created. When the keyword is present an instance of KStream is created.

Important

Use SELECT STREAM to create a KStream instance. Use SELECT to create a KTable instance.

Important

When creating a table the Kafka Messages must have a non-null key. Otherwise, the record is ignored.

KStream Settings

LSQL allows you to define a Kafka Stream flow with SQL-like syntax. From the target topic settings to the producer/consumer settings LSQL allows the user to override the defaults. LSQL supports setting these configurations via a standard SQL pattern of setting variables via SET. For example:

SET `auto.offset.reset` = 'smallest';
SET `processing.guarantee`= 'exactly_once';  //this is for Kafka 0.11+ enabling exactly once semantics
SET `commit.interval.ms` = 1000;     //The frequency with which to save the position of the processor.

Any of the target topic specific configurations can also be specified here. Follow the Apache Kafka documentation here for a full list of topic specific configuration options. To set the configuration for the flow result topic you need to prefix the key with topic.. For example to set the cleanup policy to compact ` and to `flush.messages every 5 messages you need to configure LSQL as follows:

SET `topic.cleanup.policy`='compact';
SET `topic.flush.messages`= 5;
...

Apart from the topic, producer/consumer or Kafka stream configs, LSQL allows you to set the following:

Setting Description Type Example
autocreate
If the target topic does not exist it will create it. **If the
Kafka setup does not allow for auto topic creation the flow will fail!**
BOOLEAN SET autocreate=true
partitions
The number of partitions to create for the target topic.
Applies only when autocreate is set to true. By default is false.
INTEGER SET partitions=2
replication
How many replicas to create for the target topic. Applies
only when autocreate is set to true. By default is false.
INTEGER SET replication=3
decimal.scale
When working with Avro records where decimal type is
involved it specifies the decimal scale.
INTEGER SET `decimal.scale`=18
decimal.precision
When working with Avro records where decimal type is
involved it specifies the decimal precision.
INTEGER SET `decimal.precision`=38

Important

Each SET .. instruction needs to be followed by a semicolon:;.

Here is an example of setting the commit interval to 5 seconds and enabling exactly-once semantics (Apache Kafka 0.11+):

SET `commit.interval.ms` = 5000;
SET `processing.guarantee`= 'exactly_once';

INSERT INTO `hot_sensors`
SELECT
    ip
    , lat
    , `long`
    , (temp * 1.8 + 32) as metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
AND temp > 30

Note

Configuring a stream flow via code requires the following configuration keys to be set: default.key.serde and default.value.serde. LSQL takes care of this based on the values specified in the SQL so you don’t have to set them.

RocksDB Settings

Whenever the Kafka streams application requires state, it will rely on RocksDB. Configuring this key-value store breaks the pattern found in Kafka Streams API. To customize the settings one has to provide an implementation for org.apache.kafka.streams.state.RocksDBConfigSetter. LSQL covers most of the settings available, here is the entire list :

Key Type Description  
rocksdb.table.block.cache.size LONG
Set the amount of cache in bytes
that will be used by RocksDB.
If cacheSize is non-positive, then cache will not be used.
DEFAULT: 8M
 
rocksdb.table.block.size LONG Approximate size of user data packed per block. Default: 4K  
rocksdb.table.block.cache.compressed.num.shard.bits INT TableFormatConfig.setBlockCacheCompressedNumShardBits
Controls the number of shards for the
block compressed cache
rocksdb.table.block.cache.num.shard.bits INT Controls the number of shards for the block cache  
rocksdb.table.block.cache.compressed.size LONG
Size of compressed block cache. If 0,
then block_cache_compressed is set to null
 
rocksdb.table.block.restart.interval INT Set block restart interval  
rocksdb.table.block.cache.size.and.filter BOOL
Indicating if we’d put index/filter blocks to the block cache.
If not specified, each ‘table reader’ object will pre-load index/filter
block during table initialization
 
rocksdb.table.block.checksum.type STRING
Sets the checksum type to be used with this table.
Available values: kNoChecksum, kCRC32c, kxxHash.
 
rocksdb.table.block.hash.allow.collision BOOL
Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
if true, does not store prefix and allows prefix hash collision
(less memory consumption)
 
rocksdb.table.block.index.type STRING
Sets the index type to used with this table.
Available values: kBinarySearch, kHashSearch
 
rocksdb.table.block.no.cache BOOL
Disable block cache. If this is set to true,
then no block cache should be used. Default: false
 
rocksdb.table.block.whole.key.filtering BOOL
If true, place whole keys in the filter (not just prefixes).
This must generally be true for gets to be efficient.
Default: true
 
rocksdb.table.block.pinl0.filter BOOL
Indicating if we’d like to pin L0 index/filter blocks to the block cache.
If not specified, defaults to false.
 
rocksdb.total.threads INT The max threads RocksDB should use  
rocksdb.write.buffer.size LONG
Sets the number of bytes the database will build up in memory
(backed by an unsorted log on disk) before converting to a
sorted on-disk file
 
rocksdb.table.block.size.deviation INT
This is used to close a block before it reaches the configured
‘block_size’. If the percentage of free space in the current block is less
than this specified number and adding a new record to the block will
exceed the configured block size, then this block will be closed and the
new record will be written to the next block.
Default is 10.
 
rocksdb.table.block.format.version INT
We currently have three versions:

0 - This version is currently written out by all RocksDB’s versions by default.
Can be read by really old RocksDB’s. Doesn’t support changing
checksum (default is CRC32).

1 - Can be read by RocksDB’s versions since 3.0.
Supports non-default checksum, like xxHash. It is written by RocksDB when
BlockBasedTableOptions::checksum is something other than kCRC32c. (version
0 is silently upconverted)

2 - Can be read by RocksDB’s versions since 3.10.
Changes the way we encode compressed blocks with LZ4, BZip2, and Zlib
compression. If you don’t plan to run RocksDB before version 3.10,
you should probably use this.

This option only affects newly written tables. When reading existing
tables, the information about version is read from the footer.
 
rocksdb.compaction.style STRING Available values: LEVEL, UNIVERSAL, FIFO  
rocksdb.max.write.buffer INT    
rocksdb.base.background.compaction INT    
rocksdb.background.compaction.max INT    
rocksdb.subcompaction.max INT    
rocksdb.background.flushes.max INT    
rocksdb.log.file.max LONG    
rocksdb.log.fle.roll.time LONG    
rocksdb.compaction.auto BOOL    
rocksdb.compaction.level.max INT    
rocksdb.files.opened.max INT    
rocksdb.wal.ttl LONG    
rocksdb.wal.size.limit LONG    
rocksdb.memtable.concurrent.write BOOL    
rocksdb.os.buffer BOOL    
rocksdb.data.sync BOOL    
rocksdb.fsync BOOL    
rocksdb.log.dir STRING    
rocksdb.wal.dir STRING    

All the table configurations will initialize a BlockBasedTableConfig on which the call to set the values is made. This will be followed by a call to Options.setTableFormatConfig(tableConfig).

Transformations

This is the basic and most common use case. Transforming an incoming topic to morph the messages using any of the capabilities provided by the SELECT statement. That includes:

  • Selecting specific fields
  • Applying supported functions to achieve a new field
  • Filtering the records based on your criteria.

Let’s imagine we have a topic containing sensor specific data:

{
  "device_id": 1,
  "ip": "191.35.83.75",
  "timestamp": 1447886791,
  "lat": 22,
  "long": 82,
  "scale": "Celsius",
  "temp": 22.0,
  "device_name": "sensor-AbC-193X",
  "humidity": 15,
  "zipcode": 95498
}

And we want to select only that data where the temperature is over 30 degrees Celsius. Furthermore, we want the temperature value to be expressed in Fahrenheit and we only need ip, lat, long fields from the initial data. To do so we can write this LSQL statement:

INSERT INTO `hot_sensors`
SELECT
    ip
    , lat
    , long
    , (temp * 1.8 + 32) AS metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
AND temp > 30

This is the simplest flow you could write and the query will end up producing records looking like this:

{
  "ip": "191.35.83.75",
  "lat": 22,
  "long": 82,
  "metric": 71.6
}

The SQL syntax allows you to access nested fields or a complex field. We can change slightly the structure above to do this. The new data looks like this:

{
  "description":"Sensor embedded in exhaust pipes in the ceilings",
  "ip":"204.116.105.67",
  "id":5,
  "temp":40,
  "c02_level":1574,
  "geo":{
    "lat":35.93,
    "long":-85.46
    }
}

First, we write the SQL to address the nested fields:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo.lat
    , geo.long
    , temp
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO

The result of applying this query will be Avro records with the following format:

{
   "ip":"204.116.105.67",
   "lat":35.93,
   "long":-85.46,
   "temp":40
}

If the user selects a complex field, the entire substructure is copied over. For example:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo
    , temp
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO

The new records will have this format:

{
  "ip":"204.116.105.67",
  "geo":{
    "lat":35.93,
    "long":-85.46
    },
  "temp":40
}

These examples are for records of type Avro, but the similar support is provided for Json payloads.

Aggregation

Typical streaming aggregation involves scenarios similar to these:

  • Counting the number of visitors on your website per region
  • Totalling amount of Foreign Exchange transactions for GBP-USD on a 15 minutes interval
  • Totalling sales made on each of the company stores every day
  • Retaining the minimum and maximum stock value on a 30 minutes interval

These are just a few examples - the list goes on. LSQL gives you a way of quickly express such aggregation over Kafka streams with either Json or Avro payloads.

Imagine a trading system needs to display the number of transactions made for each currency pair (GBPUSD is a currency exchange ticker). Such functionality can be easily achieved with a query like this:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker

Remember this is a stateful stream so potentially, you could see the values for a ticker more than once. It depends on how many transactions flow through the Kafka topic. The result of this query could be the following:

Key Value
GBPUSD 1
CHFYEN 1
USDEUR 1
GBPUSD 3
USDEUR 5

Suppose the user needs to look only at specific tickers. There are two approaches here where the filter is applied in the WHERE clause (best for performance) or relying on HAVING clause. Both examples are covered by the queries below:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
AND ticker LIKE '%GBP%'
GROUP BY ticker

--OR

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker
HAVING ticker in ('GBPUSD', 'EURDKK', 'SEKCHF')

Having clause allows the usage of any of the LSQL supported functions to achieve your filter requirements. To illustrate that we will filter all the tickers for USD and the list: GBPUSD, EURDKK, SEKCHF

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker
HAVING ticker IN ('GBPUSD', 'EURDKK', 'SEKCHF') OR ticker LIKE '%USD%'

There are scenarios where grouping by the record key part. Assume the fx topic contains the ticker in the key part. In that case, the queries become:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
GROUP BY _key

-- OR adding a filter

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
AND _key.* LIKE '%GBP%'
GROUP BY _key

Important

Every time GROUP BY a field is involved, the resulting key on the target topic is STRING! This is to allow joins on multiple fields.

In the version 1, LSQL does not support arithmetic on aggregation functions. By that we mean you can not do `SUM(fieldA)/count(*)`. We are looking at solutions to address this in the upcoming version(-s). Meanwhile here is how you can do it for now:

SET `auto.offset.reset`='latest';
SET autocreate = true;
SET `commit.interval.ms` = 3000;

INSERT INTO sensor_data_avg
WITH
avgStream as
(
SELECT STREAM
        COUNT(*) as total,
        SUM(temperature) AS temperatureTotal,
        SUM(humidity) AS humidityTotal,
        MIN(temperature) AS minTemperature,
        MAX(temperature) AS maxTemperature,
        MIN(humidity) AS minHumidity,
        MAX(humidity) AS maxHumidity
FROM  `sensor_data`
WHERE _ktype='STRING' AND _vtype='JSON'
GROUP BY TUMBLE(2,s),_key
)

SELECT STREAM
    temperatureTotal/total AS avgTemperature,
    humidityTotal/total AS avgHumidity,
    minTemperature,
    maxTemperature,
    minHumidity,
    maxHumidity
FROM avgStream

Notice the last SELECT statement uses the output data from the first one in order to achieve the average calculation.

Important

Doing aggregate functions (SUM/COUNT/MIN/MAX) arithmetic is not supported!

Using Window

We have shown so far simple aggregation without involving windows. That might solve some of the requirements a user has. Aggregating over a window is a very common scenario for streaming. Window support was introduced earlier; please revisit the windowing section.

Keeping the trend of IoT scenarios, imagine there is a stream of metrics information from devices across the globe. The data structure looks like this:

{
  "device_id": 2,
  "device_type": "sensor-gauge",
  "ip": "193.156.90.200",
  "cca3": "NOR",
  "cn": "Norway",
  "temp": 18,
  "signal": 26,
  "battery_level": 8,
  "timestamp": 1475600522
}

The following query allows you to count all the records received from each country on a tumbling window of 30 seconds. Such functionality can be described like this:

INSERT INTO norway_sensors_count
SELECT count(*) AS total
FROM sensors
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY tumble(30,s), cca3

The result would be records emitted on a 30 seconds interval and they would look similar to this:

Key Value
NOR 10
ROM 2
FRA 126
UK 312
US 289
NOR 2
FRA 16
UK 352
US 219

Note

Remember the key value will be of type String.

So far we have done counting only but LSQL provides support for SUM, MIN or MAX as well. Maybe your system processes customers orders and you want to keep computing every hour the total amount of orders over the last 24 hours.

SELECT
    product
    , SUM(amount) AS amount
FROM Orders
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY HOP(1, H, 1,D), product

Joins

A join operation merges two streams based on the keys of their data records. The result is a new stream.

Note

LSQL supports joins on key, but also allows the user to join based on value/key part fields. This will end up with both sides having the key of the record remapped. The key is a result of string concatenation of the fields involved

Kafka Streams supports the following join operations:

  • KStream-to-KStream Joins are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream’s records within the specified window interval to produce one result for each matching pair. A new KStream instance representing the resulting stream of the join is returned from this operator.
  • KTable-to-KTable Joins are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream’s materialized state stores to produce one result for each matching pair. A new KTable instance is produced representing the result stream of the join, which is also a changelog stream of the represented table.
  • KStream-to-KTable Joins allow you to perform table lookups against a changelog stream (KTable) upon receiving a new record from another record stream (KStream). An example use case would be to enrich a stream of orders (KStream) with the order details(KTable). Only records received from the record stream will trigger the join and produce results, not vice versa. This results in a brand new KStream instance representing the result stream of the join.

Here is a table of joins supported by Apache Kafka Streams:

Left Operand Right Operand Inner Join Left Join Outer Join
KStream KStream Yes Yes Yes
KTable KTable Yes Yes Yes
KStream KTable Yes Yes No

LSQL supports these joins operators:

  • INNER
  • LEFT
  • OUTER
  • RIGHT

Note

RIGHT JOIN will be expressed in terms of LEFT JOIN (The two operands are swapped)

Given the table above, here is a list of joins NOT possible by default in Kafka Streams API:

  • KTable RIGHT JOIN KStream
  • KTable OUTER JOIN KStream
  • KStream RIGHT JOIN KTable

LSQL ALLOWS the user to perform these operations, however, there are some costs associated with doing so. But before more details are provided, we need to give an overview of the context at hand. We said already a RIGHT JOIN is expressed as a LEFT JOIN and as a result, the above list becomes the following:

  • KStream LEFT JOIN KTable
  • KTable OUTER JOIN KStream
  • KTable LEFT JOIN KStream

The challenge here is that a KTable can only be joined with another one. Furthermore, at the moment there is not a straightforward way to go from a KStream instance to a KTable one. The only solution is to use an intermediary topic and then build the KTable required off that topic. Of course, this will hurt performance since the data has to be written to a topic and read again to allow for the join to happen. The topology description for the flow will reflect such scenario. Given this information the above joins become:

  • KTABLE LEFT JOIN KTable
  • KTable OUTER JOIN KTABLE
  • KTABLE LEFT JOIN KTABLE

A KStream OUTER JOIN KTable, despite not having support in the Kafka Streams API, is translated to a KStream OUTER JOIN KTable.

Important

LSQL transforms the flow as required to allow for the join type to happen. Fully understand the implications of making joins which require going through an intermediary topic.

Repartition

Apache Kafka Streams API does not allow joining two streams with a different partition count. This can easily be the case in real systems. For example, with an order and order-detail topic, the partition count on latter will be smaller since traffic is lower. To allow such a join LSQL makes sure it brings the two in line. As a result, it will have to create an order-repartition (the name is just an illustration) matching the right operand version.

Such operation will have a direct impact on performance since the entire topic is copied over just to have the join. The topology viewer allows the user to see when such flow change appears.

Using WITH

As you have seen earlier, the full syntax for LSQL contains the following:

[WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]

This allows you to break down your query complexity. For example, let’s consider the scenario where you have a topic for static product details which you will want to join against the topic containing the orders. From the product details, you only need to store the product name. This is the SQL to use to achieve such behavior:

...
INSERT INTO ...
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 )

SELECT ..
FROM  ...  JOIN productTable ON ...
...

Any names registered via WITH, in the example above product, can be referenced after its definition. If your requirements are as such that you need to define multiple entries you can do so by separating all the WITH via comma. For example:

WITH
productTable as
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 ),
userTable as
 (
    SELECT firstName, secondName
    FROM `user`
    WHERE _ktype = 'LONG'
    AND _vtype = AVRO
 )

The examples define tables (which translate into instances of a KTable) but you can specify a stream (which translates to an instance of KStream) by simply adding STREAM after the SELECT:

WITH
productStream AS
 (
   SELECT STREAM productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 )

Important

If the right operand is not found in the list of entities defined by WITH a Stream instance will be created. SELECT STREAM or SELECT within a join targets the left operand only.

Join on Key

When joins were introduced at the beginning of the chapter, it was stated that two records are matched when their keys are equal. Here is how you would join orders and order details for Avro records:

INSERT INTO orders_enhanced
SELECT STREAM
    o.orderNumber
    , o.status
    , SUM(od.quantityOrdered * od.priceEach) total
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = AVRO
AND od._ktype = 'LONG'
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s),o.orderNumber

Important

You can not join two topics when left operand value decoder differs from right operand value decoder. Joining values from different decoder types is not supported.

When joining streams the join needs to happen over a JoinWindow. The GROUP BY tumble(2,s) will be used as part of aggregation but also from it LSQL will build an instance of JoinWindow instance to use when joining the streams but before apply the grouping. The translation between Window and JoinWindow happens as described in the table below:

Window Type Join Window
tumble(duration) JoinWindows.of(duration)
hop(duration,advance) JoinWindows.of(duration).until(advance)
session(inactivity,duration) JoinWindows.of(inactivity).until(advance)
slide(duration) JoinWindows.of(duration)

If your topics have Json payloads the above query should be:

INSERT INTO orders_enhanced
SELECT STREAM
    o.orderNumber
    , o.status
    , SUM(od.quantityOrdered * od.priceEach) total
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = JSON
AND od._ktype = 'LONG'
AND od._vtype = JSON
GROUP BY TUMBLE(2,s),o.orderNumber

You can still join two streams without aggregating by simply doing the following:

INSERT INTO `orders_enhanced`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  `product` as p
    INNER JOIN `order_details` AS od
        ON p._key = od._key
WHERE p._ktype = BYTES
AND p._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s)

Although GROUP BY is still used it is not actually applying grouping since no grouping fields were defined. If your product topic key is not Avro, you can specify like in the example above, the _ktype='BYTES'. This gives some performance benefit for not having to deserialize the LONG in the earlier example and serializing it back to the output topic.

Important

Do not use BYTES when the payload is Avro. The Avro content is retained but the Schema Registry entry on the target will not be created.

All the functions supported by LSQL can be used in the select list. However unless grouping is involved the analytic ones: SUM, MIN, MAX, COUNT are not allowed.

INSERT INTO `orders_enhanced`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) AS productName
FROM  `order_details` AS od
    LEFT JOIN `product` as p
        ON p.productCode = od.productCode
WHERE p._ktype = BYTES
AND p._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(4,s)

Join on Fields

It is not always the case that the topic key is actually the value to join on. Maybe it was an oversight in initial development. but LSQL has it covered as well. It allows you to chose a field from the Kafka message value part to use during the join.

INSERT INTO `order_details`
SELECT STREAM
    o.orderNumber
    , o.status
    , o.flags
    , od.productCode
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o.orderNumber = od.orderNumber
WHERE o._ktype = BYTES
AND o._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s)

There is a trade-off here. Joining on a field like above means the stream needs to be remapped to allow for the new key. All groupings will result in a String key. The reason is LSQL allows you to join on more than one field! The key is a string concatenation of all the values involved.

Important

Joining on a value field(-s) will re-map the stream/table and the new key type will be String. Re-mapping a table has it’s cost since it will have to move the data from the KTable to a new topic and build a new instance of the table.

The standard way to handle joins with a table is to define the table via WITH. An optimal solution for joining orders with products to get the product name attached to the order looks like this:

INSERT INTO `orders_enhanced`
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  `order_details` AS od
    LEFT JOIN  productTable AS p
        ON p._key = od.productCode
WHERE od._ktype = BYTES
AND od._vtype = AVRO

First, a productTable is defined and it becomes the right operand for a LEFT JOIN. It is required for the od.productCode to be of type String since the key on the table is String. Also, notice _ktype is still required for order_details (od._ktype=’BYTES’). The resulting schema will have the productName field as an optional string since the right side might not be present.

Examples

Data Browsing

Additional examples of filtering JSON or AVRO Kafka messages, based on pre-conditions on either the payload of the messages, or on message metadata:

-- Select all fields from a topic with AVRO messages, filtering messages with particular value payload
SELECT *
FROM topicA
WHERE f1 IS NULL
AND f2 = 3
AND f3 < 100
AND f4 >= 2
AND f5 <= 1
AND f6 > 12.4
AND f7 LIKE '%wow'
AND _ktype = AVRO
AND _vtype = AVRO
-- Select all fields from a topic with AVRO values, while filtering on nested Avro record fields
SELECT *
FROM topicA
WHERE p1.p2.f1 IS NULL
OR (p1.p2.f2 = 3 AND p1.p2.f3 < 100 AND (p1.p2.f4 LIKE 'wow%' OR p1.p2.f5 = 'wow' ))
AND _vtype = AVRO
-- Select all fields from a topic with JSON data, while filtering on nested Json record fields
SELECT *
FROM topicA
WHERE p1.p2.f1 is NOT NULL
OR (p1.p2.f2 <> 3 AND p1.p2.f3 < 100 AND (p1.p2.f4 like 'wow%' OR p1.p2.f5 = 'wow' ))
AND _vtype = JSON
-- Select some fields (cherry-pick) from a topic with JSON data where the message timestamp
SELECT field1.fieldA
    , field2
    , field3,
    , _key.fieldM.fieldN AS N
FROM topicA
WHERE _vtype = JSON
AND _ts >= '2018-01-01 00:00:00'
-- Select fields from both the Key and the Value for messages in the first 15 minutes of 2018
SELECT field1.fieldA
    , field2
    , field3
    , _key.fieldK AS keyField
FROM topicA
WHERE _vtype = JSON
AND _ts >= '2018-01-01 00:00:00'
AND _ts <= '2018-01-01 00:00:00'

Joins

-- LEFT JOIN messages from two topics, using a 4-second tumbling window and store results into target topic
INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , CONCAT(od.productCode,'-',p.productName) AS productName
FROM  `OrdersDetailsTopic` AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
WHERE p._ktype = BYTES
AND p._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(4,s)
-- RIGHT JOIN messages from two topics over a 10-minute tumbling window
INSERT INTO `toTopic`
WITH
product AS
 (
   SELECT productName
   FROM `ProductTopic`
   WHERE _ktype = 'STRING'
   AND _vtype = AVRO
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  product AS p
    RIGHT JOIN `OrdersDetailsTopic` AS od
        ON p._key = od.productCode
WHERE od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(10,m)
-- LEFT JOIN two topics with JSon data
INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) as productName
FROM  `OrdersDetailsTopic` AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
WHERE p._ktype = BYTES
AND p._vtype = JSON
AND od._ktype = BYTES
AND od._vtype = JSON
GROUP BY TUMBLE(4,s)
-- Full LEFT JOIN of a stream with a table
SET `auto.offset.reset`='latest';
INSERT INTO `toTopic`
WITH
tableTelecom AS (
  SELECT *
  FROM  `telecom_data`
  WHERE _ktype = BYTES
  AND _vtype = AVRO
)

SELECT STREAM
    data.squareId
    , grid.polygon
FROM `activity` AS data
    LEFT JOIN tableTelecom AS grid
    ON data._key = grid._key
WHERE data._ktype = BYTES
AND data._vtype = AVRO

Scaling Stream Processing

Lenses leverages Kafka Streams and currently provides three execution modes to run Lenses SQL processors. Processors is our term for a Kafka Streams application.

../_images/execution_modes.png

IN_PROC is the default execution mode and the processors are executed locally within Lenses. This can have scalability issues and poses a risk to the running application, and can affect stability. IN_PROC is recommended only for testing.

CONNECT is the execution mode that solves these limitations and provides availability guarantees and scalability. Lenses can deploy your Lenses SQL processors in Kafka Connect. Kafka Connect provides a distributed, fault-tolerant and scalable framework as part of the core Apache Kafka distribution. We advise allocating a Kafka Connect cluster for the purpose of running LSQL stream processing.

KUBERNETES is an execution mode that provides scalability by deploying Lenses SQL runners into Kubernetes clusters. Lenses can deploy and monitor SQL runner deployments created through Lenses or existing tools such as Helm or kubectl.

Connect

Kafka Connect provides scalable, fault tolerant, distributed processing by forming a cluster of workers. The cluster provides endpoints to which Lenses will submit the processor configuration. From this point, Kafka Connect will persist configurations and distribute work to the cluster workers. Upon a restart Lenses will recover the status, configuration, and metrics of any Lenses SQL Connectors that are found in the configured clusters, this ensures that if Lenses is offline processing of data in your topologies continues. Lenses will also identify any connectors created outside of Lenses at runtime and start tracking them to provide you visibility.

To scale in or out the number of processor applications we can simply instruct Kafka Connect to decrease or increase the number of tasks across the cluster. The Lenses UI provides a simple way to deploy Lenses SQL processors and scale them, simply:

  1. Creating a new processor and selecting the cluster to deploy to
  2. Compose your SQL statement
  3. Set the parallelization .i.e many how tasks/application instances to run
  4. Give the processor a name
  5. Deploy

Lenses will check the validity of the SQL statement and if valid create the Connector instance and start to monitor its behavior.

Lenses supports the following Connector functionality:

  • CREATE - Register and create a new connector
  • PAUSE - Pause the connector and tasks
  • START - Start a paused connector
  • DELETE - Remove a connector

Note

Updating an existing connector is not directly supported. The KStream app cannot be updated and the update is more than likely going to break the Schema compatibility of the target insert table but it can be scaled.

When Landoop’s FAST DATA CSD is used, the Cloudera parcel lenses-sql-connect can install and provision the connector in a few seconds. More information and step-by-step instructions on how to install the parcel can be found at FAST DATA docs

Kubernetes

Kubernetes, a container orchestration engine, provides the perfect platform to run streaming microservices. It has the ability to ensure a configured number of application instances or pods are running and to scale them up or down accordingly. Via Helm charts, we provide a Dock image for the LSQL runner to deploy via CI/CD in a repeatable and audited manner.

Lenses can deploy SQL runners, recover the runners currently deployed and track and identify deployments created outside of Lenses.

The Lenses SQL Runner image accepts the same configuration options as environment variables as the Kafka Connect runner with an additional sql.port to expose a rest endpoint.

Lenses deploys SQL runners as Kubernetes deployment resources. These deployments are labeled so Lenses can identify and track any changes via either Lenses or outside such as Helm or kubectl from a CI/CD pipeline.

The following labels are attached to deployments.

Label Value Description UserDefined Resource
lenses lenses-processor Identifier of Lenses No Deployment
app
This is the name of
the processor, it must be unique.
If created via the Lenses API it will handle
this. It must also conform to
Kubernetes naming conventions,
[a-z0-9]([-a-z0-9]*[a-z0-9])?.
Yes Deployment
lenses-id
This is an auto-generated
tracking id. If created via Lenses
it will be assigned. If created outside of
Lenses it must not be set.
Lenses will assign a new id.
No Deployment
lenses-user
The username who created the deployment Yes Deployment
containerPort 8083
The SQL runners expose a Restful
API so you can start, stop, get the status and metrics
of the underlying KStream. Additional
the internal RocksDb state store as exposed
to allow interactive queries
No Pod
pipeline
A user-defined pipeline tag for monitoring Yes Pod

Warning

Altering pod and deployment labels of existing SQL runner deployments may cause issues tracking the deployments.

The following rest endpoints are exposed by the containers. You can, for example, create a Kubernetes services with pod selectors to access them and access the state store for interactive queries.

# get health
curl pod_id:8083/health

# get metrics
curl pod_id:8083/metrics

# get the metadata, the internal state stored information for interactive queries
curl pod_id:8083/metadata

# get the metadata for the specified name, the internal state store for interactive queries
curl pod_id:8083/metadata/(name)

# get the stream information, sql, runners, consumer group id, username
curl pod_id:8083/stream

# stop the stream
curl -X POST pod_id:8083/stop

# start the stream
curl -X POST pod_id:8083/start

Deployment Recovery

Lenses can recover existing SQL deployments at startup that have been previously created by Lenses. Additionally, it can also track deployments created outside of Lenses. For example, you may be using Helm to control and manage deployments in your production environment out of a CI/CD pipeline.

Connect Data Stores

Kafka Connect is the tool to use for moving data in and out of Apache Kafka. We provide a handful of 25+ Connectors. Each source or sink have their own specific connection details. Working with our clients we realized the way Connect expects configuration is not allowing for easy connector management. To give you an example let’s take a JMS Sink. We want to push data from JMS topics to Kafka Topics. For that, we need a mapping between the JMS source and Kafka target. Let’s add on top a secure JMS connection and furthermore from each JMS payload we want a few fields. The way majority of connectors are written makes a nightmare to configure and manage such instances. Keep things simple and make Kafka accessible is our motto, therefore we provide a SQL like syntax to describe the above. Part of the Lenses SQL engine we call it KCQL (Kafka Connect Query Language).

Here is an example of a KCQL syntax for moving data from Kafka to JMS:

INSERT INTO /sensors
SELECT sensorId AS id,
       minTemperature,
       maxTemperature,
       avgTemperature
FROM sensors_data
STOREAS JSON
WITHTYPE TOPIC

The SQL like syntax describes and drives the work the JMS sink will perform. It will pick the fields from the Kafka topic sensors_data and push it to a JMS topic /sensors. The resulting JMS messages payload will be JSON. Easy!

> “Keep things simple and make Kafka accessible is our motto.”

All our connectors support the following:

  • Avro and JSON support
  • Multiple Kafka topic to target mapping (for sinks)
  • Multiple sources to target Kafka topic (for sources)
  • Field selection, extraction, and filtering
  • Auto creation & auto evolution for the target storage
  • Error handling policies

KCQL Syntax

Here the full syntax our connectors share:

[INSERT|UPSERT]
 INTO   $TARGET
SELECT *|columns           (i.e col1,col2 | col1 AS column1,col2, field1.field2.field3, field1.field2.field3 as f3)
FROM   $TOPIC_NAME
  [ IGNORE field1, field2,.. ]
  [ AUTOCREATE ]
  [ WITHSTRUCTURE ]
  [ PK columns ]
  [ WITHTARGET = target]
  [ AUTOEVOLVE ]
  [ BATCH = N ]
  [ CAPITALIZE ]
  [ INITIALIZE ]
  [ PROJECTTO versionNumber]
  [ PARTITIONBY cola[,colb] ]
  [ DISTRIBUTEBY cola[,colb] ]
  [ CLUSTERBY value1[,value2] INTO bucketsNumber BUCKETS ]
  [ TIMESTAMP cola|sys_current ]
  [ TIMESTAMPUNIT = timestampUnit]
  [ WITHFORMAT TEXT|AVRO|JSON|BINARY|OBJECT|MAP ]
  [ WITHUNWRAP ]
  [ STOREAS   $YOUR_TYPE([key=value, .....]) ]
  [ WITHTAG (field1, tag2= constant, field2 as tag2) ]
  [ INCREMENTALMODE = incrementalMode ]
  [ WITHTYPE type ]
  [ WITHDOCTYPE = docType  ]
  [ WITHINDEXSUFFIX = suffix ]
  [ TTL = ttlType ]
  [ WITHCONVERTER = converterClass ]
  [ WITHJMSSELECTOR = jmsSelector ]
  [ WITHKEY (field1, field2.field3) ]
  [ KEYDELIM = value ]

Of course, the keywords, non-SQL specific, are not applicable at the same time. Each connector documentation details the keys it is using.

The SELECT mode is useful for target systems that do not support the concept of namespaces ( Key-Value stores such as HazelCast or Redis):

SELECT *|columns
FROM   $TOPIC_NAME
  [ IGNORE field1, field2,.. ]
  [ PK columns ]
  [ WITHSTRUCTURE ]
  [ WITHFORMAT  JSON|AVRO|BINARY ]
  [ WITHUNWRAP ]
  [ WITHGROUP theConsumerGroup]
  [ WITHOFFSET (offset1)(offset2) ]
  [ WITHPARTITION (partition),[(partition, offset) ]
  [ SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW ]
  [ LIMIT limitValue ]
  [ STOREAS $YOUR_TYPE([key=value, .....]) ]
  [ WITHTAG (field1, tag2= constant, field2 as tag2) ]
  [ INCREMENTALMODE = incrementalMode ]
  [ WITHDOCTYPE = docType  ]
  [ WITHINDEXSUFFIX = suffix ]
  [ WITHCONVERTER = converterClass ]

Here is a Redis Connector example:

"connect.redis.sink.kcql": "INSERT INTO sensorTimeseries SELECT sensorID, temperature, humidity FROM sensorsTopic STOREAS SortedSet (score=ts)",

The above KCQL instructs the Redis Kafka Sink Connector to select the three fields and store the time-series data into a Sorted Set, using the incoming field ts for scoring each message.