SQL Statements

This section introduces all the supported commands. The Table-query (or bound query) has been developed to support the typical SQL commands supported by a relational database: SELECT, CREATE, INSERT, DROP, TRUNCATE, DELETE, alongside non-standard SHOW TABLES, DESCRIBE TABLE, DESCRIBED FORMATTED, SHOW QUERIES, SHOW ALL QUERIES, KILL QUERY. This document specifies the syntax and semantics for all statements mentioned earlier. If you’re’ familiar with standard SQL, picking up Lenses SQL should be straightforward. At most, it provides a few extensions to the ANSI SQL.

Create a Table

Before storing any data, a user has to first create a table. The syntax for creating a Kafka topic(table is as follows) is:

CREATE TABLE $Table($field $fieldType[, $field fieldType,...])

FORMAT ($keyStorageFormat, $valueStorageFormat)

[PROPERTIES(partitions=$partitionCount, replication=$replication, compacted=true/false)]

The CREATE statement has the following parts:

  • CREATE TABLE - Instructs the construction of a table
  • $Table - The actual name given to the table
  • Schema - Constructed as a list of (field, type) tuple, it describes the data each record in the table contains
  • FORMAT - Defines the storage format. Since it is an Apache Kafka topic both the Key and the Value formats are required. Valid values are STRING, INT, LONG, JSON, AVRO.
  • PROPERTIES - Specifies the number of partitions the final Kafka topic should have, the replication factor in order to ensure high availability (it cannot be a number higher than the current Kafka Brokers number) and if the topic should be compacted.

A Kafka topic which is compacted is a special type of topic with a finer-grained retention mechanism that retains the last update record for each key. A compacted topic (once the compaction has been completed) contains a full snapshot of final record values for every record key not just the recently changed keys. They are useful for in-memory services, persistent data stores, reloading a cache, etc. For more details on the subject here is the ref Kafka Documentation

Given the theory here is an example of creating a table for storing customer entries:

CREATE TABLE customer (id string, address.line string, address.city string, address.postcode int, email string)
FORMAT (string, json)
PROPERTIES (partitions=1, compacted=true)

Executing the statement will end up creating the topic. Best practices dictate to use Avro as a storage format over other formats. In this case, the Key can still be stored as STRING (although nothing stops the user to set it as Avro) but the Value should be Avro. By adapting the previous query the new statement is:

CREATE TABLE customer_avro (id string, address.line string, address.city string, address.postcode int, email string)
FORMAT (string, avro)
PROPERTIES (partitions=1, compacted=true)

List all tables

A system could have many tables and the user could be interested in just listing their names. Therefore this type of syntax is supported:

SHOW TABLES
/* The output for tables mapping to Kafka topics

name            internal replicas  partitions
customer        false       1            1
customer_avro   false       1            1
*/

Table Schema

For each table, the SQL allows the user to get its schema. The schema describes the data the table retains on each record. The syntax for the statement is:

DESCRIBE TABLE $tableName

The $tableName should contain the name of the table to describe. Given the two tables created earlier, a user can run the following SQL to get the information on each table:

DESCRIBE TABLE customer;

DESCRIBE TABLE customer_avro;
/*
The output:
_key                        String
_value.address.postcode     Int
_value.address.city         String
_value.address.line         String
_value.email                String
_value.id                   String
*/

Table Extended Information

Since a table is associated with a Kafka topic, and the table information goes beyond the data schema, Lenses SQL supports another statement allowing the user to retrieve more information on the table. This information will return the actual Kafka topic properties. It is expected this type of query to be used mainly by the developers and DevOps looking after the health of the infrastructure.

DESCRIBE FORMATTED $Table

Taking the tables referred to so far, a user can execute the following:

DESCRIBE FORMATTED customer_avro

/* Output:
# Column Name                           # Data Type
_key                                    String
_value.address.postcode                 Int
_value.address.city                     String
_value.address.line                     String
_value.email                            String
_value.id                               String

# Config Key                            # Config Value
cleanup.policy                          compact
compression.type                        producer
delete.retention.ms                     86400000
file.delete.delay.ms                    60000
flush.messages                          9223372036854775807
flush.ms                                9223372036854775807
index.interval.bytes                    4096
max.message.bytes                       1000012
message.format.version                  1.1-IV0
message.timestamp.difference.max.ms     9223372036854775807
message.timestamp.type                  CreateTime
min.cleanable.dirty.ratio               0.5
min.compaction.lag.ms                   0
min.insync.replicas                     1
preallocate                             false
retention.bytes                         2147483648
retention.ms                            604800000
segment.bytes                           1073741824
segment.index.bytes                     10485760
segment.jitter.ms                       0
segment.ms                              604800000
unclean.leader.election.enable          false
*/

Deleting a Table

Being able to drop an existing table is a common request. Maybe a large table is not needed anymore, or in the case of a development environment being a good citizen and cleaning after yourself should be the norm. In order to remove a table a user would need to run this code:

DROP TABLE $Table

If the Lenses user permission allows it running the SQL statement for the customer/customer_avro tables will result in the underlying Kafka topics to be removed.

Inserting data

A table is created to store data. As a result, Lenses SQL allows you to utilize the ANSI SQL command to store new records into a table. A typical scenario is most likely found during development and less so in a production environment. However, even in production, if the system uses a table records to trigger actions (it is a common scenario to have Kafka control topics) being able to insert a record to instruct a microservice to replay the entire day records could come handy.

The first approach to insert new records uses the following syntax:

INSERT INTO $Table(column1[, column2, column3])
VALUES(value1[,value2, value3])
  • $Table - The name of the table to insert the data into
  • Columns - The target columns to populate with data. Adding a new record does not require to fill all the columns available. In the case of Avro stored Key, Value the user needs to make sure for all the required Avro fields a value is specified.
  • VALUES - The set of value to insert. It has to match the list of columns provided including their type.

Reinstating the two tables, customer and customer_avro, a user can execute these statements to insert records into both

INSERT INTO customer_avro (_key, id, address.line, address.city, address.postcode, email)
VALUES ('andy.perez','andy.perez', '260 5th Ave', 'New York', 10001, 'andy.perez@lenses.io');

INSERT INTO customer_avro (_key,id, address.line, address.city, address.postcode, email)
VALUES ('alexandra.smith','alexandra.smith', '8448 Pin Oak Avenue','San Francisco', 90044, 'alexandra.smith@lenses.io');

INSERT INTO customer (_key, id, address.line, address.city, address.postcode, email)
VALUES ('maria.wood','maria.wood', '698 E. Bedford Lane','Los Angeles', 90044, 'maria.wood@lenses.io');

INSERT INTO customer (_key, id, address.line, address.city, address.postcode, email)
VALUES ('david.green', 'david.green', '4309 S Morgan St', 'Chicago', 60609, 'david.green@lenses.io');
/* Output:
true
true
true
true
/*

By running these four INSERT statements each topic gets two records. Shortly SELECT statement will be explained and then a user could easily validate the result.

There is a second way to insert data into a table, by copying the records from another table. In this case, the syntax is:

INSERT INTO $TABLE1
SELECT */column1[,column2, ...]
FROM $Table2
[WHERE $condition]
[LIMIT N]

Since the next subchapter discusses querying a table, familiarizing with the SELECT statement should come first and then a return to the current subchapter. However, if the reader is already familiar with the concept then the following example should be straightforward.

A user needing to copy data to a target topic where the records are returned from a SELECT statement is quite a common scenario. The SELECT query can be as simple as a wildcard * or as complex as a select from a join while applying aggregation. To keep this example simple let’s copy all the records from the customer table into customer_avro one. Executing the following SQL code achieves that:

INSERT INTO customer_avro
SELECT *
FROM customer
/* Output:
Insert wrote 2 records
*/

Querying a Table

Powerful SELECT allows the user to quickly get the data is looking for. The full syntax is described below:

A SELECT statement has a few parts to it:

  • SET - Extending the select context by setting variables which impact the execution
  • SELECT - Defines parts of the record to return or function calls
  • FROM - Indicates the table from which to retrieve rows.
  • JOIN - Indicates the type of join to execute between $Table1 and $Table2
  • $JoinCondition - A set of conditions for which two records, one from $Table and the other one from $Table2 can be combined
  • WHERE - A set of condition(-s) the record must satisfy
  • GROUP BY - Indicates the records should be combined based on the sql expression which follows the syntax
  • $GroupingExpr - A SQL expression which could use fields or even function calls. The result of this expression drives the grouping criteria
  • LIMIT - a restriction set on how many records to return
[ SET max.size = 1000000;]
[ SET max.query.time = 5000;]
[ SET max.zero.polls = 5;]
[ SET {any.kafka.consumer.setting}= '$value';]
SELECT select_expr [, select_expr ...]
FROM $Table1
    [JOIN $Table2 on $JoinCondition]
[WHERE $condition]
[GROUP BY $GroupingExpr]
[LIMIT N]

A wildcard select returns the entire row, whereas if specific fields are selected then only parts of the row data is returned. The statement is not limited to selecting fields (nested and array elements are included) but allows calling functions, aggregating values (count, sum, min, max, etc.) or even joins two tables.

Considering the previous step where records were created for the customer and customer_avro tables, to retrieve them a user can run the following:

SELECT * FROM customer_avro;

SELECT * FROM customer;

SELECT *
FROM customer_avro
WHERE address.city = 'New York';

SELECT _key as id, address.line as address, address.postcode as code
FROM customer_avro
WHERE address.city = 'New York';

Examples of many types of statements a user can run can be found under the Learn in 10 minutes chapter.

Truncating a Table

At times a requirement to wipe out all the records in a table could pop up. In case of a Kafka topic, if records not aligned with the expected schema have landed to the topic, the user might request for the entire topic data to be deleted. The syntax to use in such a case is this:

TRUNCATE TABLE $Table

where the $Table should be the table to delete all the records for. This operation is only supported on non-compacted topics. It is a Kafka design restriction. To wipe the data from a compacted topic, the user has two options: first is to drop and recreate the topic, the second one is to insert null Value records for each unique key on the topic. Rebuilding the customer table to be non-compacted the truncate translates to:

TRUNCATE TABLE customer;

/* SELECT count(*) FROM customer returns 0 after the previous statement */

Note

Truncating a compacted Kafka topic is not supported. It is an Apache Kafka restriction. A user could drop and recreate the table, or insert for each unique key in the topic a record with a null Value.

Deleting records

ANSI SQL supports a construct for deleting records and Lenses SQL engine has adopted it as well. When it comes to a table representing a topic in Apache Kafka there are two behaviors.

  • If the topic is not compacted, then the delete expects an offset to delete records up to. A query to delete the records from the customer table looks like this:
-- Delete records across all partitions up to the offset 10
DELETE FROM customer WHERE _meta.offset <=10

-- Delete records from a specific partition */
DELETE FROM customer WHERE _meta.offset <=10 AND _meta.partition = 2
  • If the topic is compacted the delete expects the record Key to be provided. For a compacted topic a delete translates to inserting a record with the existing Key but the Value has to be null. Using the customer_avro topic (which has the compacted flag on) a delete for a specific customer identifier looks like this:
DELETE FROM customer_avro
WHERE _key = 'andy.perez'

The delete is an insert operation. Until the compaction kicks in and it completes, there will be at least one record with the Key used earlier. But the latest (or last) record will have the Value set to null.

Manage Your Queries

With many users actively running queries, an administrator might be directly interested to see, at a given time, what queries and who is running them. If a bad query is identified, the admin person would be interested in stopping it. To visualize all the in-progress queries here is the syntax to use.

SHOW QUERIES

The output of this command will list all the queries, their code, the user, the time it started, the bytes scanned and the records scanned. Say a user has an in-flight query and the admin runs the command described earlier, the output generated could look similar to the one below:

/*
 username                      query                        bytes_read               id                         status            started             records_scanned
mike.jones    "SELECT * FROM cc_payments  WHERE amount < 10"      35192       002a244f-ba4f-40cc-aea7-0a4b7feba4bc    Running     2018-10-25T10:11:30.826Z     367
*/

If the query should be stopped the admin user can execute this statement:

KILL QUERY "002a244f-ba4f-40cc-aea7-0a4b7feba4bc"

A useful feature supported by the engine is showing the in-flight queries but also recently run queries. There is a threshold of 30 minutes the engine keeps the information related to completed queries. Listing all running and recently completed queries a user would need to execute:

SHOW ALL QUERIES