SQL Statements

This section introduces all the supported commands. The Table-query (or bound query) has been developed to support the typical SQL commands supported by a relational database: SELECT, CREATE, INSERT, DROP, TRUNCATE, DELETE, alongside non-standard SHOW TABLES, DESCRIBE TABLE, DESCRIBED FORMATTED, SHOW QUERIES, SHOW ALL QUERIES, KILL QUERY. This document specifies the syntax and semantics for all statements mentioned earlier. If you are familiar with standard SQL, picking up Lenses SQL should be straightforward. At most, it provides a few extensions to the ANSI SQL.

Create a Table

Before storing any data, a user has to create a table. The syntax for creating a Kafka topic (table) is as follows:

CREATE TABLE $Table($field $fieldType[, $field fieldType,...])

FORMAT ($keyStorageFormat, $valueStorageFormat)

[PROPERTIES(partitions=$partitionCount, replication=$replication, compacted=true/false)]

The CREATE statement has the following parts:

  • CREATE TABLE - Instructs the construction of a table.
  • $Table - The actual name given to the table.
  • Schema - Constructed as a list of (field, type) tuple, it describes the data each record in the table contains.
  • FORMAT - Defines the storage format. Since it is an Apache Kafka topic both the Key and the Value formats are required. Valid values are STRING, INT, LONG, JSON, AVRO.
  • PROPERTIES - Specifies the number of partitions the final Kafka topic should have, the replication factor in order to ensure high availability (it cannot be a number higher than the current Kafka Brokers number) and if the topic should be compacted.

A Kafka topic which is compacted is a special type of topic with a finer-grained retention mechanism that retains the last update record for each key. A compacted topic (once the compaction has been completed) contains a full snapshot of the final record values for every record key and not just the recently changed keys. They are useful for in-memory services, persistent data stores, reloading caches, etc. For more details on the subject you should look at Kafka Documentation

Given the aforementioned theory, here is an example of creating a table for storing customer entries:

CREATE TABLE customer (id string, address.line string, address.city string, address.postcode int, email string)
FORMAT (string, json)
PROPERTIES (partitions=1, compacted=true)

Executing the statement will end up creating the topic. Best practices dictate to use Avro as a storage format over other formats. In this case, the Key can still be stored as STRING (although nothing stops the user to set it as Avro) but the Value should be Avro. By adapting the previous query the new statement will be as follows:

CREATE TABLE customer_avro (id string, address.line string, address.city string, address.postcode int, email string)
FORMAT (string, avro)
PROPERTIES (partitions=1, compacted=true)

List all tables

A system could have many tables and the user could be interested in just listing their names. Therefore this type of syntax is supported:

SHOW TABLES
/* The output for tables mapping to Kafka topics

name            internal replicas  partitions
customer        false       1            1
customer_avro   false       1            1
*/

Table Schema

For each table, the SQL allows the user to get its schema. The schema describes the data the table retains on each record. The syntax for the statement is as follows:

DESCRIBE TABLE $tableName

The $tableName should contain the name of the table to describe. Given the two tables created earlier, a user can run the following SQL to get the information on each table:

DESCRIBE TABLE customer;

DESCRIBE TABLE customer_avro;
/*
The output:
_key                        String
_value.address.postcode     Int
_value.address.city         String
_value.address.line         String
_value.email                String
_value.id                   String
*/

Table Extended Information

Since a table is associated with a Kafka topic, and the table information goes beyond the data schema, Lenses SQL supports another statement allowing the user to retrieve more information on the table. This information will return the actual Kafka topic properties. It is expected this type of query to be used mainly by the developers and DevOps looking after the health of the infrastructure:

DESCRIBE FORMATTED $Table

Taking the tables referred to so far, a user can execute the following:

DESCRIBE FORMATTED customer_avro

/* Output:
# Column Name                           # Data Type
_key                                    String
_value.address.postcode                 Int
_value.address.city                     String
_value.address.line                     String
_value.email                            String
_value.id                               String

# Config Key                            # Config Value
cleanup.policy                          compact
compression.type                        producer
delete.retention.ms                     86400000
file.delete.delay.ms                    60000
flush.messages                          9223372036854775807
flush.ms                                9223372036854775807
index.interval.bytes                    4096
max.message.bytes                       1000012
message.format.version                  1.1-IV0
message.timestamp.difference.max.ms     9223372036854775807
message.timestamp.type                  CreateTime
min.cleanable.dirty.ratio               0.5
min.compaction.lag.ms                   0
min.insync.replicas                     1
preallocate                             false
retention.bytes                         2147483648
retention.ms                            604800000
segment.bytes                           1073741824
segment.index.bytes                     10485760
segment.jitter.ms                       0
segment.ms                              604800000
unclean.leader.election.enable          false
*/

Deleting a Table

Being able to drop an existing table is a common request. Maybe a large table is not needed anymore or, in the case of a development environment, being a good citizen and cleaning after yourself should be the norm. In order to remove a table, a user would need to run this code:

DROP TABLE $Table

If the Lenses user permission allows it, running the SQL statement for the customer/customer_avro tables will result in the underlying Kafka topics to be removed.

Inserting data

A table is created to store data. As a result, Lenses SQL allows you to utilize the ANSI SQL command to store new records into a table. A typical scenario is most likely found during development and less often in a production environment. However, even in production, if the system uses a table and its records to trigger actions (it is a common scenario to have Kafka control topics), being able to insert a record to instruct a microservice to replay the entire day records could come handy.

The first approach to insert new records uses the following syntax:

INSERT INTO $Table(column1[, column2, column3])
VALUES(value1[,value2, value3])
  • $Table - The name of the table to insert the data into
  • Columns - The target columns to populate with data. Adding a new record does not require to fill all the available columns. In the case of Avro stored Key, Value pairs the user needs to make sure that a value is specified for all the required Avro fields.
  • VALUES - The set of value to insert. It has to match the list of columns provided including their data types.

Reinstating the two tables, customer and customer_avro, a user can execute these statements to insert records into both:

INSERT INTO customer_avro (_key, id, address.line, address.city, address.postcode, email)
VALUES ('andy.perez','andy.perez', '260 5th Ave', 'New York', 10001, 'andy.perez@lenses.io');

INSERT INTO customer_avro (_key, id, address.line, address.city, address.postcode, email)
VALUES ('alexandra.smith','alexandra.smith', '8448 Pin Oak Avenue','San Francisco', 90044, 'alexandra.smith@lenses.io');

INSERT INTO customer (_key, id, address.line, address.city, address.postcode, email)
VALUES ('maria.wood','maria.wood', '698 E. Bedford Lane','Los Angeles', 90044, 'maria.wood@lenses.io');

INSERT INTO customer (_key, id, address.line, address.city, address.postcode, email)
VALUES ('david.green', 'david.green', '4309 S Morgan St', 'Chicago', 60609, 'david.green@lenses.io');
/* Output:
true
true
true
true
/*

By running these four INSERT statements, each topic gets two records. Shortly SELECT statement will be explained and then a user could easily validate the result.

There is a second way to insert data into a table, which is by copying the records from another table. In this case, the syntax is as follows:

INSERT INTO $TABLE1
SELECT */column1[,column2, ...]
FROM $Table2
[WHERE $condition]
[LIMIT N]

Since the next subchapter discusses the querying of tables, familiarizing yourselves with the SELECT statement should come first and then a return to the current subchapter. However, if the reader is already familiar with the concept then the following example should be straightforward.

A user needing to copy data to a target topic where the records are returned from a SELECT statement is quite a common scenario. The SELECT query can be as simple as a wildcard * or as complex as a select from a join while applying aggregation. To keep this example simple let us copy all the records from the customer table into customer_avro one. Executing the following SQL code achieves that:

INSERT INTO customer_avro
SELECT *
FROM customer
/* Output:
Insert wrote 2 records
*/

Querying a Table

The powerful SELECT allows the user to quickly get the data is looking for. The full syntax is described below.

A SELECT statement has a few parts to it:

  • SET - Extending the select context by setting variables which impact the execution.
  • SELECT - Defines parts of the record to return or function calls.
  • FROM - Indicates the table from which to retrieve rows.
  • JOIN - Indicates the type of join to execute between $Table1 and $Table2.
  • $JoinCondition - A set of conditions for which two records, one from $Table and the other one from $Table2 can be combined.
  • WHERE - A set of condition(-s) the record must satisfy.
  • GROUP BY - Indicates the records should be combined based on the sql expression which follows the syntax.
  • $GroupingExpr - A SQL expression which could use fields or even function calls. The result of this expression drives the grouping criteria.
  • LIMIT - A restriction set on how many records to return.
[ SET max.size = 1000000;]
[ SET max.query.time = 5000;]
[ SET max.zero.polls = 5;]
[ SET {any.kafka.consumer.setting}= '$value';]
SELECT select_expr [, select_expr ...]
FROM $Table1
    [JOIN $Table2 on $JoinCondition]
[WHERE $condition]
[GROUP BY $GroupingExpr]
[LIMIT N]

A wildcard select returns the entire row, whereas if specific fields are selected then only parts of the row data is returned. The statement is not limited to selecting fields (nested and array elements are included) but allows calling functions, aggregating values (count, sum, min, max, etc.) or even joins of two tables.

Considering the previous step where records were created for the customer and customer_avro tables, a user can run the following query to retrieve them back:

SELECT * FROM customer_avro;

SELECT * FROM customer;

SELECT *
FROM customer_avro
WHERE address.city = 'New York';

SELECT _key as id, address.line as address, address.postcode as code
FROM customer_avro
WHERE address.city = 'New York';

Examples of the many types of statements a user can run can be found under the Learn in 10 minutes chapter.

Truncating a Table

At times a requirement to wipe out all the records in a table could pop up. In case of a Kafka topic, if records not aligned with the expected schema have landed to the topic, the user might request for the entire topic data to be deleted. The syntax to use in such a case is as follows:

TRUNCATE TABLE $Table

where the $Table should be the table name to delete all the records for. This operation is only supported on non-compacted topics, which is a Kafka design restriction. To wipe the data from a compacted topic, the user has two options: either dropping and recreating the topic or inserting null Value records for each unique key on the topic. After rebuilding the customer table to be non-compacted, the truncate can be performed as follows:

TRUNCATE TABLE customer;

/* SELECT count(*) FROM customer returns 0 after the previous statement */

Note

Truncating a compacted Kafka topic is not supported, which is an Apache Kafka restriction. A user could drop and recreate the table, or insert a record with a null Value for each unique key in the topic.

Deleting records

ANSI SQL supports a construct for deleting records and the Lenses SQL engine has adopted it as well. When it comes to a table representing a topic in Apache Kafka there are two behaviors.

  • If the topic is not compacted, then DELETE expects an offset to delete records up to. A query to delete the records from the customer table looks like the following:
-- Delete records across all partitions up to the offset 10
DELETE FROM customer WHERE _meta.offset <=10

-- Delete records from a specific partition */
DELETE FROM customer WHERE _meta.offset <=10 AND _meta.partition = 2
  • If the topic is compacted the delete expects the record Key to be provided. For a compacted topic a delete translates to inserting a record with the existing Key but the Value has to be null. For the customer_avro topic (which has the compacted flag on), a delete operation for a specific customer identifier would look like this:
DELETE FROM customer_avro
WHERE _key = 'andy.perez'

The delete is an insert operation. Until the compaction kicks in and it completes, there will be at least one record with the Key used earlier. But the latest (or last) record will have the Value set to null.

Manage Your Queries

With many users actively running queries, an administrator might be directly interested to see, at any given time, the running queries and who is running them. If a bad query is identified, the admin person might be interested in stopping it. Here is the command for visualizing all in-progress queries:

SHOW QUERIES

The output of this command will list all the queries, their code, the user, the time they started, the bytes scanned and the records scanned. Say a user has an in-flight query and the admin runs the command described earlier, then the generated output could look similar to the following:

/*
 username                      query                        bytes_read               id                         status            started             records_scanned
mike.jones    "SELECT * FROM cc_payments  WHERE amount < 10"      35192       002a244f-ba4f-40cc-aea7-0a4b7feba4bc    Running     2018-10-25T10:11:30.826Z     367
*/

If the query should be stopped, the admin user can execute the following statement:

KILL QUERY "002a244f-ba4f-40cc-aea7-0a4b7feba4bc"

A useful feature supported by the engine is showing not only the in-flight queries but the recently run ones. There is a threshold of 30 minutes that the engine keeps the information related to completed queries. Listing all running and recently completed queries a user would need to execute:

SHOW ALL QUERIES