SQL Processors

The SQL Processor is for continuously processing a data stream. As streaming data can never end, you will have to have ways to bypass this fact when you write certain types (aggregates/joins) of queries over live data.

The following ways for querying live data:

  • Using Hopping time windows
  • Using Tumbling time windows
  • Using Sliding windows
  • Using Session windows

All the above need to be used with a GROUP BY keyword. Have in mind that not all types of windows are applicable to all kinds of data. The results of

queries will be written to new Kafka topics as defined in each query.

In this page we are going to present continuous processors for the first three types of windows. As Session windows are a little bit more complex, they will be explained in a separate blog post in the near future.

Your first SQL Processor

In this SQL processor we are going to use a sliding window for processing the var_log_broker Kafka topic.

The continuous SQL query will be the following:

SET autocreate=true;

INSERT INTO sliding_logs
SELECT COUNT(*) AS total_number
FROM log_files_as_json
GROUP BY SLIDING(5, m);

This query just counts the number of records in the sliding window of 5 minutes.

The generated output will be similar to the following when presented in RAW format:

[
    {
        "key": "AAAAAAAAAWncdEBA",
        "value": 1744,
        "metadata": {
            "timestamp": 1554182059027,
            "__keysize": 12,
            "__valsize": 8,
            "partition": 0,
            "offset": 0
        }
    },
    {
        "key": "AAAAAAAAAWnceNQg",
        "value": 330,
        "metadata": {
            "timestamp": 1554182396837,
            "__keysize": 12,
            "__valsize": 8,
            "partition": 0,
            "offset": 1
        }
    },

Your second SQL Processor

The second SQL processor example will use a Hopping time window and calculate the minimum and maximum length of the processed log entries.

The query that will be used is:

SET autocreate=true;

INSERT INTO hopping_time_logs
SELECT
    MIN(LENGTH(the_text)) AS minimum
    , MAX(LENGTH(the_text)) AS maximum
FROM log_files_as_json
GROUP BY HOP(30, s, 10, s);

The Hopping Time Window used is 30 seconds with an advance interval of 10 seconds.

The generated output will be similar to the following when presented in RAW format:

[
    {
        "key": "AAAAAAAAAWncd5ug",
        "value": {
            "minimum": 0,
            "maximum": 4675
        },
        "metadata": {
            "timestamp": 1554182049021,
            "__keysize": 12,
            "__valsize": 28,
            "partition": 0,
            "offset": 0
        }
    },
    {
        "key": "AAAAAAAAAWncd8Kw",
        "value": {
            "minimum": 0,
            "maximum": 4675
        },
        "metadata": {
            "timestamp": 1554182059027,
            "__keysize": 12,
            "__valsize": 28,
            "partition": 0,
            "offset": 1
        }
    },

Your third SQL Processor

This time the SQL processor will use a tumbling window.

A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

SET autocreate=true;

INSERT INTO tumbling_time_logs
SELECT
    MIN(LENGTH(the_text)) AS minimum,
    MAX(LENGTH(the_text)) AS maximum
FROM log_files_as_json
GROUP BY TUMBLE(30, s);

The generated output will be similar to the following when presented in RAW format:

[
    {
        "key": "AAAAAAAAAWncd+nA",
        "value": {
            "minimum": 0,
            "maximum": 4675
        },
        "metadata": {
            "timestamp": 1554182059027,
            "__keysize": 12,
            "__valsize": 28,
            "partition": 0,
            "offset": 0
        }
    },
    {
        "key": "AAAAAAAAAWncejOw",
        "value": {
            "minimum": 77,
            "maximum": 219
        },
        "metadata": {
            "timestamp": 1554182219934,
            "__keysize": 12,
            "__valsize": 28,
            "partition": 0,
            "offset": 1
        }
    },