Browsing Topics

SELECT Syntax

Apache Kafka retains records as raw bytes. The tools provided by default by the open source project ( kafka-console-consumer or kafka-avro-console-consumer) are quite limited when querying for existing data. LSQL comes to the rescue, providing a friendly SQL syntax allowing for a user experience found when dealing with relational database systems.

Powerful SELECT allows the user to quickly get the data is looking for. The full syntax is described below:

[ SET `max.bytes` = 1000000;]
[ SET `max.time` = 5000;]
[ SET `max.zero.polls` = 5;]
[ SET `{any.kafka.consumer.setting}`= `value`;]
SELECT select_expr [, select_expr ...]
FROM `topic_Reference`
WHERE [_ktype = KEY_TYPE]
[AND _vtype = VALUE_TYPE]
[AND FILTERS]
[LIMIT N]

The most commonly used clauses of SELECT statements are these:

  • each select_expr indicates a column that you want to retrieve or a function call. There must be at least one select_expr.
  • topic_reference indicates the topic (or table by analogy) from which to retrieve rows.
  • the WHERE clause, if given, indicates the condition or conditions that a record must satisfy to be selected. where_condition is an expression that evaluates to true for each record to be selected. The statement selects all records if there is no WHERE clause. In the WHERE expression, you can use any of the functions and operators that LSQL supports, except for aggregate (summary) functions. A more detail overview of the filter clause can be found on the Where Clause section.
  • the _ktype/_vtype can overwrite the payload information associated for a give topic (table). You might have a topic which contains JSON for the record value part. Using WHERE _vtype=STRING will allow the engine to read the payload as a STRING value.

Aliasing is not supported in this scenario. Since there is only one source topic, all the select_expr are automatically qualified to that topic.

Control execution

As a user you have full control on the resources involved and the time to execute the query, LSQL engine allows to set:

Name Description Usage
max.bytes
The maximum amount of data to return.
Default is 20MB
SET `max.bytes` = 20000000;
will set a max of 20MB to be returned
max.time
The maximum amount of time the query
is allowed to run in msec.
Default is 1 hour
SET `max.time` = 60000;
sets a one minute query limit
max.zero.polls
The maximum number of Kafka Consumer
poll operations returning zero records before
the query is completed.
Default is 8
SET `max.zero.polls` = 5;
sets a maximum of 5 calls returning 0.
LIMIT N
The maximum of records to return.
Default is 10000
SELECT *
FROM payments
LIMIT 100;

A real scenario, while browsing the data, is for the query to reach the end of the topic and none of the query boundaries to have been reached. In this scenario, the query will continue running until one of the limits is reached.

The max.zero.polls covers scenarios where all the data in the topic has been processed and none of the limits have been reached. This setting will short-circuit, for example, a query running for 1 hour, if for 8 (default) consecutive Kafka polls, there are no new records.

Sample scenario: Max time is set to 5 minutes and all the records have been processed in 30 seconds; then there is no reason to wait 4 minutes and 30 seconds before the query is completed.

Tune the underlying consumer

On very rare occasions, when browsing the data, there might be a requirement to tune the underlying consumer parameters. LSQL allows setting the consumer properties. For example, to set the consumer fetch.max.wait.ms is as simple as:

SET `fetch.max.wait.ms`= `250`;
SELECT *
FROM `payments`
LIMIT 10

Value Fields

The previous section has given a quick overview of what a Kafka message is composed of. To select fields within the value part of a message, the user has to just use the field name. For example, let’s assume the following message structure flows through your Kafka topic.

{
  "user": "a user id",
  "region" : "Europe",
  "url" : "someurl",
  "timestamp": 1503256766
}

To select the field named region from the message it is as simple as writing the query below:

SELECT region
FROM `web-traffic`

Notice the query specifies the format type for both key and value components of a Kafka message. The result of such a query will be a new AVRO record containing only one field: region.

If the format of the source topic is JSON the result would be a JSON containing one field.

Important

When a JSON decoder type is used, it is expected the payload is the bytes representation of a Json object.

Not all the payloads stored in a Kafka message have a linear structure. Nested structures are quite a common scenario and LSQL has full support for selecting nested fields.

Consider the message structure below for the message value component:

{
  "name": "Rick",
  "address": {
    "street": {
      "name": "Rock St"
    },
    "street2": {
      "name": "Sunset Boulevard"
    },
    "city": "MtV",
    "state": "CA",
    "zip": "94041",
    "country": "USA"
  }
}

We would want to flatten the structure. To do so, the following query can be used:

SELECT
    name
    , address.street.name AS address1
    , address.street2.name AS address
    , city
    , state
    , zip
    , country
FROM `target_topic`

The result of such query statement will be a new AVRO record containing seven fields: name, address1, address2, city, state, zip and country. The same applies in case of a JSON payload:

SELECT
    name
    , address.street.name AS address1
    , address.street2.name AS address2
    , city
    , state
    , zip
    , country
FROM `target_topic`

Key Fields

Many times, especially in IoT systems, device information is in the message key and it is not replicated in the message value part. LSQL allows selecting the entire key (for primitive type: Int/Long/Double/String) or a specific field in case of AVRO/JSON structures. To do that all it is required is to use the _key prefix.

Imagine a scenario where various metrics are obtained from an electrical device and the message key part structure looks like this:

{
    "deviceId": 192211,
    "model":"AX14c1"
}

To select the device id, the following query can be written:

SELECT
    _key.deviceId
    , meterValue
    , ...
FROM `metrics`

Note

Prefix the field with _key if you want to select a field which is part of the key. Do not prefix with value/_value the fields you want to select from the Kafka message value part!.

Selecting Special Fields

Sometimes, a Kafka message payload will contain fields which are special keywords in the grammar (they might be function names or Apache Calcite specific grammar tokens) or they might start with a non-alphanumeric character. Selecting them as they are will lead to errors. For example:

SELECT
    system.name
    , @timestamp
    ,...
FROM `metrics`

To get the above working the fields need to be escaped. LSQL is based on MySQL syntax, therefore the escaping character is `. The syntax correctly handling the two fields in:

SELECT
    `system`.name
    , `@timestamp`
    ,...
FROM `metrics`

Arrays

You can store multiple variables of the same type in an array data structure. You can access the array item by using the [] construct:

SELECT
    , values[2] as amount
    ,...
FROM `payments`

Arrays are zero indexed. An array with n elements is indexed from 0 to n-1. The LSQL engine supports arrays construct for JSON, AVRO, XML, CSV, Protobuf or any other custom payload types.

Audit Queries

Using the configuration file, Lenses can be set to stores any LSQL query made for browsing. Alongside the query details, the information pushed to the topic also contains the user and the time it was executed.

Set the configuration entry lenses.topics.lsql.storage=_kafka_lenses_lsql_storage and it activates the functionality. Then using the topic viewer screen the storage topic can be queried to see what are the most queries run.