Inserting and Deleting Data¶
For the SQL engine, the term table is equivalent to Kafka topic. The two are used interchangeably.
A table is created to store data. As a result, Lenses SQL allows you to utilize the ANSI SQL command to store new records into a table. A typical scenario is most likely found during development and less often in a production environment. However, even in production, if the system uses a table and its records to trigger actions (it is a common scenario to have Kafka control topics), being able to insert a record to instruct a microservice to replay the entire day records could come handy.
The first approach to insert new records uses the following syntax:
INSERT INTO $Table(column1[, column2, column3]) VALUES(value1[,value2, value3]) INSERT INTO $Table(column1[, column2, column3]) VALUES (value1[,value2, value3]), (value4[,value5, value6])
- $Table - The name of the table to insert the data into
- Columns - The target columns to populate with data. Adding a new record does not require to fill all the available columns. In the case of Avro stored Key, Value pairs, the user needs to make sure that a value is specified for all the required Avro fields.
- VALUES - The set of value to insert. It has to match the list of columns provided, including their data types.
Use multi-values insert to ensure the order of records.
Reinstating the two tables, customer and customer_avro, a user can execute these statements to insert records into both:
INSERT INTO customer_avro (_key, id, address.line, address.city, address.postcode, email) VALUES ('andy.perez','andy.perez', '260 5th Ave', 'New York', 10001, 'email@example.com'), ('alexandra.smith','alexandra.smith', '8448 Pin Oak Avenue','San Francisco', 90044, 'firstname.lastname@example.org'); INSERT INTO customer (_key, id, address.line, address.city, address.postcode, email) VALUES ('maria.wood','maria.wood', '698 E. Bedford Lane','Los Angeles', 90044, 'email@example.com'); ('david.green', 'david.green', '4309 S Morgan St', 'Chicago', 60609, 'firstname.lastname@example.org'); /* Output: true true true true /*
By running these four
INSERT statements, each topic gets two records. Shortly SELECT statement will be explained and
then a user could easily validate the result.
There is a second way to insert data into a table, which is by copying the records from another table. This records inserted are bound at the time of the query. New records added to the original table are not inserted into the target.
The syntax is as follows:
INSERT INTO $TABLE1 SELECT */column1[,column2, ...] FROM $Table2 [WHERE $condition] [LIMIT N]
A user needing to copy data to a target topic where the records are returned from a SELECT statement is quite a common scenario. The SELECT query can be as simple as a wildcard * or as complex as a select from a join while applying aggregation. To keep this example simple let us copy all the records from the customer table into customer_avro one. Executing the following SQL code achieves that:
INSERT INTO customer_avro SELECT * FROM customer /* Output: Insert wrote 2 records */
ANSI SQL supports a construct for deleting records and the Lenses SQL engine has adopted it as well. When it comes to a table representing a topic in Apache Kafka there are two behaviors.
- If the topic is not compacted, then
DELETEexpects an offset to delete records up to. A query to delete the records from the customer
table looks like the following:
-- Delete records across all partitions up to the offset 10 DELETE FROM customer WHERE _meta.offset <=10 -- Delete records from a specific partition */ DELETE FROM customer WHERE _meta.offset <=10 AND _meta.partition = 2
- If the topic is compacted, then
DELETEexpects the record Key to be provided. For a compacted topic a delete translates to inserting a record
with the existing Key but the Value has to be null. For the customer_avro topic (which has the compacted flag on), a delete operation for a specific customer identifier would look like this:
DELETE FROM customer_avro WHERE _key = 'andy.perez'
The delete is an insert operation. Until the compaction kicks in and it completes, there will be at least one record with the Key used earlier. But the latest (or last) record will have the Value set to null.
Truncating a Table¶
At times a requirement to wipe out all the records in a table could pop up. In case of a Kafka topic, if records not aligned with the expected schema have landed to the topic, the user might request for the entire topic data to be deleted. The syntax to use in such a case is as follows:
TRUNCATE TABLE $Table
where the $Table should be the table name to delete all the records for. This operation is only supported on non-compacted topics, which is a Kafka design restriction. To wipe the data from a compacted topic, the user has two options: either dropping and recreating the topic or inserting null Value records for each unique Key on the topic. After rebuilding the customer table to be non-compacted, the truncate can be performed as follows:
TRUNCATE TABLE customer; /* SELECT count(*) FROM customer returns 0 after the previous statement */
Truncating a compacted Kafka topic is not supported, which is an Apache Kafka restriction. A user could drop and recreate the table,
or insert a record with a null Value for each unique key in the topic.