This page describes a tutorial for access headers via SQL Studio.
Since version 0.11, Apache Kafka supports message headers. Headers are a typical concept in messaging systems like JMS and transport systems like TCP or HTTP.
They can be used for routing, filtering, and annotation. The header can contain pairs of key=value
, where the key
is always a string(text), and the value
is an array of bytes.
Given that the actual value is an array of bytes, the developer who produces the messages knows whether this array of bytes represents a string, an int, a long, or a double. Lenses can visualize and use the record headers via SQL queries.
By default, Lenses fetches the headers of messages and displays them on the UI.
To view the value of a specific header you can run:
This page describes a tutorial for querying data with SQL Studio.
When querying Kafka topic data with SQL such as
a full scan will be executed, and the query processes the entire data on that topic to identify all records that match the transaction ID.
If the Kafka topic contains a billion 50KB messages - that would require querying 50 GB of data. Depending on your network capabilities, brokers’ performance, any quotas on your account, and other parameters, fetching 50 GB of data could take some time! Even more, if the data is compressed. In the last case, the client has to decompress it before parsing the raw bytes to translate into a structure to which the query can be applied.
No. Apache Kafka does not have the full indexing capabilities in the payload (indexes typically come at a high cost even on an RDBMS / DB or a system like Elastic Search), however, Kafka indexes the metadata.
The only filters Kafka is supporting are topic, partition and offsets or timestamp.
If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.
If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.
The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.
The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.
Lenses provides a set of rules for
termination control
resource protection
query management
Adding a LIMIT 10 in the SQL query will result in the SQL terminating early, as soon as 10 x messages have been discovered. It’s not a perfect solution as we might never find 10 x messages, and thus perform a full scan.
Add LIMIT to your query to have quick and efficient query completion
One can control the maximum time a SQL query will run for. The admin can set up a default value, and a user can override it.
One can control the maximum bytes the SQL query will fetch. The admin can set up a default value, but a more advanced user can override it.
The above will make sure the query terminates after 5 seconds of reaching the end of the topic. The admin can set up a default value. The idea is that there is no reason to keep polling if we have exhausted the entire topic.
The complete set of SQL Queries on Apache Kafka are currently being executed under a specific client-id lenses.sql.engine
and an admin can apply a global Kafka quota to restrict the maximum total network I/O.
By adding a Quota on your Kafka cluster under the lenses.sql.engine
CLIENT name, you can also control the global network I/O that is allocated to all users querying Kafka data with SQL.
An admin can view all active queries with:
and control them, i.e., stop a running query with the following statement
This page describes a tutorial on how to work with JSON in Lenses SQL.
In this tutorial, we will see how to use Lenses SQL to process JSON strings using JsonPath, a sort of XPath for JSON objects.
In Lenses SQL you can use a JSON_EXTRACT_FIRST()
and JSON_EXTRACT_ALL()
to navigate and transform JSON strings.
We have a topic http_logs
that collects the details of HTTP calls to a microservice. Basic details of the request are stored, like the URL
and the HTTP
method used. The payload of the requests is stored as well as a string.
We can create the topic and insert some example data through SQL Studio:
The HTTP
method and the URL
used for the request are stored in the method
and url
fields respectively, while the optional payload, and its content-type, are stored in the body
and content_type
fields.
As you can imagine the logs contained in this topic are quite generic, and different endpoints may have different content-type
s for their body. For this reason the best the system can do is storing the payload as a simple string, whenever that is possible.
This comes with some drawbacks: since the data is a simple string, and it is not structured, it is not possible to inspect it as we would do with a normal AVRO
/JSON
object.
Fortunately Lenses SQL offers a couple of handful functions that make our life easier in these kind of scenarios.
Our first task is to find the username of users created with a call to POST /users
.
To do that we can use JSON_EXTRACT_FIRST(json_string, pattern)
, one of the string functions available in Lenses SQL. The first argument of the function is the string representing the JSON
we want to manipulate. The second is a string representing a JsonPath.
JsonPath is a powerful way to traverse and extract elements from a JSON
object. Explaining the full details of goes beyond the scope of this article, but in general it can be thought as a JSON
version of XPath
, the standard used to select elements from an XML document.
A nice way to try and test if your JsonPaths are doing what you intended, is using the JsonPath online evaluator.
In our case, we would like to extract the name of the user just created. The simple path $.username
will do it!
Let’s try to use it in a SELECT
that we can run in SQL Studio:
That query will produce the results
As you can see we have two entries for juno
. That’s because the user was first created, and then modified later, with a PUT
call.
Also, there are some null
values. This is because JSON_EXTRACT_FIRST
was not able to extract the username, either because the payload was not valid JSON
, or because the field was not found.
We can fix this restricting our query to user creation calls:
We have now only valid results:
All Lenses SQL functions can be used in any part of the query. Thus JSON_EXTRACT_FIRST
can be used in the projections, where, and group bys.
For example, you can run the query
to retrieve max
’s e-mail:
So far we had fun using JSON_EXTRACT_FIRST
, but we have not talked yet about its bigger brother, JSON_EXTRACT_ALL
.
JSON_EXTRACT_ALL(json_string, pattern)
works like JSON_EXTRACT_FIRST
, except that it will return all the values that match the pattern
. The results will be returned in an array, and when no results are found the empty array will be returned.
Let’s make use of it, extracting all the contact types used at the moment of the creation of the user:
Running the query above we get what we desired:
JSON_EXTRACT_FIRST()
and JSON_EXTRACT_ALL()
are available also in the Streaming Engine, like most Lenses SQL functions.
Let’s say we want another topic continuously filled with the contact types used for user creations. We also want each record containing a single username-contact type pair. To achieve that we can take the query of the last example and adapt it a bit, using a Lateral Join:
JSON_EXTRACT_FIRST()
and JSON_EXTRACT_ALL()
simplifies your life every time you have to deal with JSON
that is represented as a string value of a field in your topic.
The use of JsonPath
make them very powerful and even complex operations are easily representable with it.
This page describes a tutorial for deleting data in compacted topics from SQL Studio.
In this example, we will show how we can use Lenses to delete records from a compacted topic which stores users' calling information based on a different topic that stores the users’ response to a “do you want to be contacted back” survey.
With the following code, we can create a compacted topic that holds user call information:
Notice we add cleanup.policy=compact
to tell Lenses we want the topic to be compacted. The remaining properties try to force compaction to happen often so that we can easily visualize the result (this should not be used in production though).
We start by adding some records to our user_info topic:
Which we can see by inspecting the topic.
We additionally add a second topic which will hold information regarding users' response to the survey:
As previously, we added some records to contact_survey
Using the following:
We are essentially issuing a delete command for all users who said they didn’t want to be contacted.
Looking at our user_calls
topic again, we can see the newly inserted records with a null
value, but our original records are still there… How so?
Due to some internals of Kafka, log compaction doesn’t always work immediately but in this case, by adding an extra record we can force it to happen:
Looking at the data inside our topic, we can now see that users who responded that they didn’t want to be contacted are no longer part of our topic; the tombstone records (the ones with a null value) will stay around for as long as our retention policy specifies and will eventually be removed, leaving us with a topic with only users that want to be contacted.