Tables

The term table is equivalent with a topic when it comes to Apache Kafka. A Kafka record is made of the following parts:

  • Key
  • Value
  • Timestamp
  • Topic
  • Partition
  • Offset
  • Headers (a collection of key-value pairs)

The Key and Value components can hold any type of data. To achieve maximum performance, Kafka is not aware of the actual record storage format and what it actually contains. Typically a Key is a primitive value. It can be a string (customer unique identifier - say an email), int (the payment transaction identifier) or long (the IoT device unique identifier). However, there is nothing stopping the user from using complex/nested structures as keys. The same applies to the Value component.

Here are the rules for retrieving each part of a Kafka record using SQL:

  • Key - Use _key to look at the Key component. Using just _key will select the Key value. If the key contains complex data, use _key.fieldA to address a specific field in the record.
  • Value - Access a field in the Value component directly, or optionally use the _value prefix. The following expressions are equivalent SELECT firstName FROM customer and SELECT _value.firstName FROM customer. When using _value (i.e. SELECT _value FROM customer), the entire Value component data is returned.
  • Headers - To retrieve a record header use the _header prefix. For example SELECT _header.lenses FROM customer returns the value of a header named lenses.
  • Metadata - All the other components of a record can be selected using the _meta prefix. For example SELECT _meta.partition, _meta.timestamp, _meta.offset FROM customer returns the record table-partition, the record timestamp and its offset within the table-partition.
-- Selecting fields from the Key and Value
SELECT  _key.device.id
       , _key.device.tags[0] as model
       , temperature
       , humidity
       , geo.latitude
       , params[1].value as battery
FROM iot_readings

Table Schema

For each table, the SQL engine tracks and maintains its schema. Given a Kafka record, this means there is a schema for the Key and one for the Value. You can use the DESCRIBE TABLE command to quickly see the schema of a given table.

DESCRIBE TABLE payments
/*
Would yield a result like this
_key                String
_value.id           String
_value.time         String
_value.amount       decimal<8,2>
_value.currency     String
_value.creditCardId String
_value.merchantId   Long
*/

In order to understand the record content, the SQL engine needs to have the Key and Value storage format set for each topic. The full list of formats currently supported out of the box is as follows:

  • JSON
  • AVRO
  • XML
  • CSV
  • PROTOBUF
  • STRING
  • INT
  • LONG
  • BYTES (default)
  • Custom