Manage Kafka topics


This section introduces all the supported commands for managing a Kafka topic.

Lenses supports the typical SQL commands supported by a relational database:

  • CREATE
  • DROP
  • TRUNCATE
  • DELETE
  • SHOW TABLES
  • DESCRIBE TABLE
  • DESCRIBE FORMATTED

Create a table (CREATE) 

CREATE TABLE 
    table_name(
        $field $fieldType [, $field $fieldType,...]
    )
FORMAT ($keyStorageFormat, $valueStorageFormat)
[PROPERTIES(
        partitions= *, 
        replication=$replication, 
        compacted=true/false)
];

The CREATE statement has the following parts:

  • CREATE TABLE - Instructs the construction of a table
  • $Table - The actual name given to the table.
  • Schema - Constructed as a list of (field, type) tuple, it describes the data each record in the table contains
  • FORMAT - Defines the storage format. Since it is an Apache Kafka topic, both the Key and the Value formats are required. Valid values are STRING, INT, LONG, JSON, AVRO.
  • PROPERTIES - Specifies the number of partitions the final Kafka topic should have, the replication factor in order to ensure high availability (it cannot be a number higher than the current Kafka Brokers number) and if the topic should be compacted.

A Kafka topic which is compacted is a special type of topic with a finer-grained retention mechanism that retains the last update record for each key.

A compacted topic (once the compaction has been completed) contains a full snapshot of the final record values for every record key and not just the recently changed keys. They are useful for in-memory services, persistent data stores, reloading caches, etc.

For more details on the subject, you should look at Kafka Documentation.

Example:

CREATE TABLE customer (
        id STRING 
        , address.line STRING 
        , address.city STRING, 
        , address.postcode INT 
        , email STRING
    )
FORMAT (string, json)
PROPERTIES (
    partitions=1, 
    compacted=true
);

Best practices dictate to use Avro as a storage format over other formats. In this case, the key can still be stored as STRING but the value can be Avro.

CREATE TABLE customer_avro (
        id STRING
        , address.line STRING
        , address.city STRING 
        , address.postcode int 
        , email string
    )
FORMAT (string, avro)
PROPERTIES (
    partitions=1, 
    compacted=true
);

List all tables (SHOW TABLES) 

To list all tables:

SHOW TABLES;

Table schema (DESCRIBE TABLE) 

To examine the schema an metadata for a topic:

DESCRIBE TABLE $tableName

The $tableName should contain the name of the table to describe.

Given the two tables created earlier, a user can run the following SQL to get the information on each table:

DESCRIBE TABLE customer_avro

the following information will be displayed:

/* Output:
# Column Name                           # Data Type
_key                                    String
_value.address.postcode                 Int
_value.address.city                     String
_value.address.line                     String
_value.email                            String
_value.id                               String

# Config Key                            # Config Value
cleanup.policy                          compact
compression.type                        producer
delete.retention.ms                     86400000
file.delete.delay.ms                    60000
flush.messages                          9223372036854775807
flush.ms                                9223372036854775807
index.interval.bytes                    4096
max.message.bytes                       1000012
message.format.version                  1.1-IV0
message.timestamp.difference.max.ms     9223372036854775807
message.timestamp.type                  CreateTime
min.cleanable.dirty.ratio               0.5
min.compaction.lag.ms                   0
min.insync.replicas                     1
preallocate                             false
retention.bytes                         2147483648
retention.ms                            604800000
segment.bytes                           1073741824
segment.index.bytes                     10485760
segment.jitter.ms                       0
segment.ms                              604800000
unclean.leader.election.enable          false
*/

Deleting a table 

To drop a table:

DROP TABLE $Table;

System virtual tables (__tables, __fields) 

Lenses provides a set of virtual tables that contain information about all the fields in all the tables.

Using the virtual table, you can quickly search for a table name but also see the table type.

The __table has a table_name column containing the table name, and a table_type column describing the table type (system, user, etc).

SELECT * FROM __tables;

SELECT *
FROM __tables
WHERE table_name LIKE '%customer%';

To see all the tables fields select from the _fields virtual table.

SELECT * 
FROM __fields;

SELECT * 
FROM __fields
WHERE table_name LIKE '%customer%'
--
Last modified: September 15, 2024