Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This page describes a tutorial on how to work with JSON in Lenses SQL.
In this tutorial, we will see how to use Lenses SQL to process JSON strings using JsonPath, a sort of XPath for JSON objects.
In Lenses SQL you can use a JSON_EXTRACT_FIRST()
and JSON_EXTRACT_ALL()
to navigate and transform JSON strings.
We have a topic http_logs
that collects the details of HTTP calls to a microservice. Basic details of the request are stored, like the URL
and the HTTP
method used. The payload of the requests is stored as well as a string.
We can create the topic and insert some example data through SQL Studio:
The HTTP
method and the URL
used for the request are stored in the method
and url
fields respectively, while the optional payload, and its content-type, are stored in the body
and content_type
fields.
As you can imagine the logs contained in this topic are quite generic, and different endpoints may have different content-type
s for their body. For this reason the best the system can do is storing the payload as a simple string, whenever that is possible.
This comes with some drawbacks: since the data is a simple string, and it is not structured, it is not possible to inspect it as we would do with a normal AVRO
/JSON
object.
Fortunately Lenses SQL offers a couple of handful functions that make our life easier in these kind of scenarios.
Our first task is to find the username of users created with a call to POST /users
.
To do that we can use JSON_EXTRACT_FIRST(json_string, pattern)
, one of the string functions available in Lenses SQL. The first argument of the function is the string representing the JSON
we want to manipulate. The second is a string representing a JsonPath.
JsonPath is a powerful way to traverse and extract elements from a JSON
object. Explaining the full details of goes beyond the scope of this article, but in general it can be thought as a JSON
version of XPath
, the standard used to select elements from an XML document.
A nice way to try and test if your JsonPaths are doing what you intended, is using the JsonPath online evaluator.
In our case, we would like to extract the name of the user just created. The simple path $.username
will do it!
Let’s try to use it in a SELECT
that we can run in SQL Studio:
That query will produce the results
As you can see we have two entries for juno
. That’s because the user was first created, and then modified later, with a PUT
call.
Also, there are some null
values. This is because JSON_EXTRACT_FIRST
was not able to extract the username, either because the payload was not valid JSON
, or because the field was not found.
We can fix this restricting our query to user creation calls:
We have now only valid results:
All Lenses SQL functions can be used in any part of the query. Thus JSON_EXTRACT_FIRST
can be used in the projections, where, and group bys.
For example, you can run the query
to retrieve max
’s e-mail:
So far we had fun using JSON_EXTRACT_FIRST
, but we have not talked yet about its bigger brother, JSON_EXTRACT_ALL
.
JSON_EXTRACT_ALL(json_string, pattern)
works like JSON_EXTRACT_FIRST
, except that it will return all the values that match the pattern
. The results will be returned in an array, and when no results are found the empty array will be returned.
Let’s make use of it, extracting all the contact types used at the moment of the creation of the user:
Running the query above we get what we desired:
JSON_EXTRACT_FIRST()
and JSON_EXTRACT_ALL()
are available also in the Streaming Engine, like most Lenses SQL functions.
Let’s say we want another topic continuously filled with the contact types used for user creations. We also want each record containing a single username-contact type pair. To achieve that we can take the query of the last example and adapt it a bit, using a Lateral Join:
JSON_EXTRACT_FIRST()
and JSON_EXTRACT_ALL()
simplifies your life every time you have to deal with JSON
that is represented as a string value of a field in your topic.
The use of JsonPath
make them very powerful and even complex operations are easily representable with it.
This page describes a tutorial for querying data with SQL Studio.
When querying Kafka topic data with SQL such as
a full scan will be executed, and the query processes the entire data on that topic to identify all records that match the transaction ID.
If the Kafka topic contains a billion 50KB messages - that would require querying 50 GB of data. Depending on your network capabilities, brokers’ performance, any quotas on your account, and other parameters, fetching 50 GB of data could take some time! Even more, if the data is compressed. In the last case, the client has to decompress it before parsing the raw bytes to translate into a structure to which the query can be applied.
No. Apache Kafka does not have the full indexing capabilities in the payload (indexes typically come at a high cost even on an RDBMS / DB or a system like Elastic Search), however, Kafka indexes the metadata.
The only filters Kafka is supporting are topic, partition and offsets or timestamp.
If we specify in our query that we are only interested in partition 1, and for the sake of example the above Kafka topic has 50 x partitions. Then Lenses will automatically push this predicate down, meaning that we will only need to scan 1GB instead of 50GB of data.
If we specify the offset range and the partition, we would only need to scan the specific range of 100K messages resulting in scanning 5MB.
The above will query only the data added to the topic up to 1 hour ago. Thus we would query just 10MB.
The above will query only the data that have been added to the Kafka topic on a specific day. If we are storing 1,000 days of data, we would query just 50MB.
Lenses provides a set of rules for
termination control
resource protection
query management
Adding a LIMIT 10 in the SQL query will result in the SQL terminating early, as soon as 10 x messages have been discovered. It’s not a perfect solution as we might never find 10 x messages, and thus perform a full scan.
Add LIMIT to your query to have quick and efficient query completion
One can control the maximum time a SQL query will run for. The admin can set up a default value, and a user can override it.
One can control the maximum bytes the SQL query will fetch. The admin can set up a default value, but a more advanced user can override it.
The above will make sure the query terminates after 5 seconds of reaching the end of the topic. The admin can set up a default value. The idea is that there is no reason to keep polling if we have exhausted the entire topic.
The complete set of SQL Queries on Apache Kafka are currently being executed under a specific client-id lenses.sql.engine
and an admin can apply a global Kafka quota to restrict the maximum total network I/O.
By adding a Quota on your Kafka cluster under the lenses.sql.engine
CLIENT name, you can also control the global network I/O that is allocated to all users querying Kafka data with SQL.
An admin can view all active queries with:
and control them, i.e., stop a running query with the following statement
This page describes a tutorial for deleting data in compacted topics from SQL Studio.
In this example, we will show how we can use Lenses to delete records from a compacted topic which stores users' calling information based on a different topic that stores the users’ response to a “do you want to be contacted back” survey.
With the following code, we can create a compacted topic that holds user call information:
Notice we add cleanup.policy=compact
to tell Lenses we want the topic to be compacted. The remaining properties try to force compaction to happen often so that we can easily visualize the result (this should not be used in production though).
We start by adding some records to our user_info topic:
Which we can see by inspecting the topic.
We additionally add a second topic which will hold information regarding users' response to the survey:
As previously, we added some records to contact_survey
Using the following:
We are essentially issuing a delete command for all users who said they didn’t want to be contacted.
Looking at our user_calls
topic again, we can see the newly inserted records with a null
value, but our original records are still there… How so?
Due to some internals of Kafka, log compaction doesn’t always work immediately but in this case, by adding an extra record we can force it to happen:
Looking at the data inside our topic, we can now see that users who responded that they didn’t want to be contacted are no longer part of our topic; the tombstone records (the ones with a null value) will stay around for as long as our retention policy specifies and will eventually be removed, leaving us with a topic with only users that want to be contacted.
This page describes a tutorial to rekey data in a Kafka topic with Lenses SQL Processors.
Sometimes you have a topic that is almost exactly what you need, except that the key of the record requires a bit of massaging.
In Lenses SQL you can use the special SELECT ... as _key
syntax to quickly re-key your data.
In our example, we have a topic containing events coming from temperature sensors.
Each record contains the sensor’ measured temperature, the time of the measurement, and the id of the sensor. The key is a unique string (for example, a UUID) that the upstream system assigns to the event.
You can replicate the example, by creating a topic in SQL Studio:
We can also insert some example data to do our experiments:
You can explore the topic in lenses to check the content and the shape of what you just inserted.
Let’s say that what you need is that same stream of events, but the record key should be the sensor_id
instead of the UUID.
With the special SELECT ... as _key
syntax, a few lines are enough to define our new re-keying processor:
The query above will take the sensor_id
from the value of the record and put it as the new key. All the values fields will remain untouched:
Maybe the sensor_id
is not enough, and for some reason, you also need the hour of the measurement in the key. In this case, the key will become a composite object with two fields: the sensor_id
and the event_hour
:
As you can see, you can build composite objects in Lenses with ease just listing all the structure’s fields, one after the other.
In the last example, the _key
output storage format will be inferred automatically by the system as JSON
. If you need more control, you can use the STORE AS
clause before the SELECT
.
The following example will create a topic as the previous one, but where the keys will be stored as AVRO
:
Happy re-keying!
This page a tutorial to control AVRO record names and namespaces with Lenses SQL Processors.
When writing output as AVRO
, Lenses creates schemas for you, automatically generating AVRO
record names.
In this tutorial we will learn how to override the default record naming strategy.
In Lenses SQL you can use a SET
statement to control the record and namespace name generated for the AVRO schema.
We are going to create and populate a topic that we will later use in a couple of SQL Processors.
In SQL Studio, create the topic running the following query:
For the purposes of our tutorial, it is enough to insert a single topic:
We are now going to create a processor that will show the default behavior of AVRO
record naming in Lenses.
The processor does not do much, it just reshapes the fields of the original topic, putting some of them in a nested field:
We then start the processor. Lenses will create the new topic mytopic_2
, and new schema will be created in the Schema Registry, as soon as the first (and only) record is processed.
If we inspect the value schema of mytopic_2
, we see that this is the one generated:
As we can see, each record
type has a name (it is mandatory in AVRO
), and Lenses has generated those names automatically for us (record
, record0
, record1
etc.).
We are now going to see how to override that default behavior.
Let’s create and start the new processor with the following SQL:
Notice how we added the new SET
statements to the query:
These settings are telling Lenses to set the root record name and namespace to the values specified.
If we now check the value schema for mytopic_3
we get:
As we can see, the root record
element has now name myRecordName
and namespace myNamespace
.
Notice how the settings did not affect nested records.
If the key of the generated topic has AVRO
format as well, you can use the following analogous settings to control the key record name and namespace:
A setting like the one we used before for the value schema:
will affect all the topics used by the processor.
If you want instead to target a single topic, you can use the topic-specific version:
The setting above will override the record name only for topic mytopic_3
. Other topics will not be affected and will keep using the default naming strategy.
This section will showcase recipes and sample scenarios for you to use Lenses.
This page describes a tutorial for access headers via SQL Studio.
Since version 0.11, Apache Kafka supports message headers. Headers are a typical concept in messaging systems like JMS and transport systems like TCP or HTTP.
They can be used for routing, filtering, and annotation. The header can contain pairs of key=value
, where the key
is always a string(text), and the value
is an array of bytes.
Given that the actual value is an array of bytes, the developer who produces the messages knows whether this array of bytes represents a string, an int, a long, or a double. Lenses can visualize and use the record headers via SQL queries.
By default, Lenses fetches the headers of messages and displays them on the UI.
To view the value of a specific header you can run:
Tutorial on how to change the format of data in a Kafka topic from JSON to AVRO with Lenses SQL Processors.
In this example, we will show how to create an AVRO topic from an existing JSON topic.
For this to work, Lenses has to know what the source topic schema is. Lenses can do this in one of three ways:
through direct user action where the schema is manually set
through inference; Lenses will try to infer the schema of a topic by looking at the topic data
and lastly, if the topic is created through Lenses, the schema will be automatically set
With the following SQL we can create our intial JSON topic:
to which we can add data using:
It can be quickly verified that the format of our newly created topic is JSON for both key and value by searching our topic car_speed_events_json
in our explore view:
To create a new topic with format AVRO, we can create a processor that will copy the data from our original topic to a new topic changing the format in the process.
To do this, we start by going to “SQL Processor”, clicking: “New SQL Processor” and defining our processor with the following code:
Notice the addition of STORE KEY AS AVRO VALULE AS AVRO
. This statement will tell our processor which format we want each facet (key or value) to be stored as.
Hitting “Create New Processor” will start a new processor
We can see the events were added and from now on, Lenses will keep pushing any new events added to car_speed_events_avro
into car_speed_events_avro
.
We can also verify that the topic format of our new topic is also AVRO for both the key and value facets:
This page describes a tutorial to filter records in a Kafka topic with Lenses SQL Processors.
Filtering messages and copying them to a topic can be achieved using the WHERE clause.
In our example, we have a topic where our application registers bank transactions.
We have a topic called payments
where records have this shape:
We can replicate such a structure running the following query in SQL Studio:
Each event has a unique string key generated by the upstream system.
We can again use SQL Studio to insert some data to play wi
Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.
We want to copy those transactions into a new topic, maintaining the content of the records as it is.
For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.
Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE
statement with a >=
condition:
Checking the records emitted by the processor, we see that we got the transactions we were looking for.
Because of the *
projection, records content has not changed.
Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with AND
s and OR
s:
As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE
statement for that:
getting the results:
In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions.
You will also see how to use a CAST
expression to convert from one type to another.
Checking the output, we can see that only one transaction satisfied our predicate:
Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.
We decided then to build a reduced copy of the payments topic, with only 10% of the original records.
To do that we are going to use our RANDINT
function:
RANDINT
generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.
We have to CAST
to double
on the way; otherwise, the division would be between integers, and the result would always be 0
.
This page describes a tutorial to enrich a Kafka topic using Lenses SQL Processors.
In this article, we will be enriching customer call events with their customer details.
Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.
Topics involved:
customer_details
messages contain information about the customer
customer_call_details
messages contain information about calls
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
customer_details
customer_details
customer_call_details
customer_call_details
This page describe a tutorial to change the shape (fields) of data in a Kafka topic using Lenses SQL Processors.
In this tutorial, we will see how to use Lenses SQL to alter the shape of your records.
In Lenses SQL you can quickly reshape your data with a simple SELECT
and some built-in functions.
We will learn how to
put value fields into the key
lift key fields into the value
call functions to transform your data
build nested structures for your keys/values
unwrap singleton objects into primitive types
In our example, we are getting data from speed sensors from a speed car circuit.
The upstream system registers speed measurement events as records in a Kafka topic.
An example of such an event is the following:
We can replicate such a structure running the following query in SQL Studio:
Each event is keyed by a unique string generated by the upstream system.
We can again use SQL Studio to insert some data to play with:
In this section, we are only interested in the speed of single cars, and we do not care about all the other fields.
We want to use the car id, which now is part of the record Value, to become the new key (using the special as _key
syntax). We also want the car speed as the new record Value.
To achieve that we can create a new SQL Processor using some simple projections.
Checking the records emitted by the processor we see that the shape of the records is
We want to avoid that intermediate wrapping of speedMph
inside an object. To do that we can tell Lenses to unwrap the value with the special as _value
syntax, saving some bytes and CPU cycles:
Now the shape of the records is what we had in mind:
This time we want to do some more complex manipulation. We want to convert the speed from Mph to Kmph, and we would also want to build a nice string describing the event.
An example of an output record would be:
In this case, we are using CONCATENATE
to concatenate multiple strings, CAST
to convert an expression to another type, and *
, the usual multiplication operator.
If we check the resulting records, we can see that we obtained the shape we were looking for. Please note that the keys have been left untouched:
In this last example, we will show how to create composite keys and values in our projections.
We want both the sensor id
and the event_time
as the record Key. For the record Value, we want the car_id
and the speed, expressed both as Mph and Kmph.
Lenses SQL allows as to use nested aliases to build nested structures. You have to put some dots in your aliases.
The resulting shape of the record is what we were aiming for:
Happy re-shaping!
This page describes a tutorial to unwrap a complex data type in a Kafka topic using Lenses SQL Processors.
In this example, we will show how Lenses can be used to transform complex data types into simple primitive ones.
We start this tutorial by creating a topic which will hold information regarding visits to our website:
Firstly we’ll add some data to our newly created topic:
For example, let’s say we’re interested in sending this data to a service that analyses the time spent on a page and how it changes over time.
This system has a caveat though it only accepts data where keys are specified as strings and values are specified as integers.
Rather than having to reimplement our analysis system, we can create a SQL Processor that will continuously send data to a new topic in a format the target system can work with:
Notice the addition of the as _key
and as _value
aliases; these tell lenses to “unwrap” the values; effectively making lenses write them as primitive types (string and integer respectively) instead of (in this particular case) Avro objects.
Lenses will also automatically infer the format of each topic facet, in this case it set them to STRING and INT respectively.
--
This page describes a tutorial on how to work with array data in your Kafka topics using Lenses SQL Processors.
In this tutorial, we will see how to use Lenses SQL to extract, manipulate and inspect the single elements of an array.
In Lenses SQL you can use a LATERAL JOIN
to treat the elements of an arrays as a normal field.
You will learn to:
Extract the single elements of an array with a single LATERAL
join.
Extract elements of a multi-level array with nested LATERAL
joins.
Use arbitrary complex array expressions as the right hand side of a LATERAL
join.
In this example, we are getting data from a sensor.
Unfortunately, the upstream system register the readings in batches, while what we need is a single record for every reading.
An example of such a record is the following:
Notice how each record contains multiple readings, inside the reading
array field.
We can replicate such a structure running the following query in SQL Studio:
We can again use SQL Studio to insert some data to play with:
What we want to obtain is a new topic readings
, where each record contain a single reading, together with its meter_id
. Considering the first record, we expect to explode it to four different records, one for each reading:
In LensesSQL you can easily achieve that with the special LATERAL
syntax .
You can create a processor defined as:
The magic happens in batched_readings LATERAL readings as reading
. With that we are basically saying:
for each record in batched_readings
for each element inside the readings
array of that record
build a new record with all the fields of the original batched_readings
record, plus an extra reading
field, that will contain the value of the current element of readings
.
We can then use in the SELECT
both the original fields and the new reading
field.
If you save the processor and run it, you will see that it will emit the records we expected.
One of the powerful features of a LATERAL
join is that the expression you put in the LATERAL
can be used as a normal field. This means that you can then use it for example also in a WHERE
or in a GROUP BY
.
In this section we will see how to filter records generated by a LATERAL
using a normal WHERE
.
We want to modify our previous processor in order to emit only the readings greater than 95
.
To do that is enough to use reading
as if it were a normal field, in the WHERE
section:
Running the processor we get the records
This example is similar to the previous one. The only difference is that the readings are now stored in batches of batches:
As you can see, nested_readings
is an array whose elements are array of integers.
We can again use SQL Studio to insert some data:
We would like to define a processor that emits the same records of the previous one.
In this case though we are dealing with nested_readings
, that is a multi-level array, so a single LATERAL
join is not enough. But nesting a LATERAL
inside another will do the jo
This is roughly what happens in the FROM
clause:
We first unwrap the first level of the array, doing a batched_readings_nested LATERAL nested_readings as readings
.
At that point, readings
will be an array of integers.
We can then use it in the outer ... LATERAL readings as reading
join, and the single integers will finally be extracted and made available as reading
.
In this section we will see how it is possible to use any expression as the right hand side of a LATERAL
join, as long as it gets evaluated to an array.
We have a table where the meter readings are split into two columns, readings_day
and readings_night
.
Let’s insert the same data as the first example, but where the readings are split across the two columns.
To extract the readings one by one, we need first to concatenate the two arrays readings_day
and readings_night
. We can achieve that using flatten.
We can then use the concatenated array in a LATERAL
join:
The processor defined above will emit the records
as we expected.
This describes how to control event time for data in your Kafka topics with Lenses SQL Processors.
Every message in Kafka comes with a timestamp, and Lenses Engine Streaming mode uses that by default when doing time-dependent operations, like aggregations and joins.
Sometimes though that timestamp is not exactly what you need, and you would like to use a field in the record value or key as the new timestamp.
In Lenses SQL you can use the special EVENTTIME BY ...
syntax to control records timestamp.
In our toy example, we have a simple topic where electricity meter readings events are collected:
We can also insert some example data to do our experiments:
If you query the events, you can see that Kafka sets a timestamp for each record. That timestamp is, in our case, the time of when the record was inserted. As you can see, it is totally unrelated to the event_time
field we have in the payload.
We would like to transform our original stream of events, aggregating events with a hopping window of 10s
width and an increment of 5s
, computing the average for each window.
You can create a new processor that streams those averages, using the special WINDOW BY ...
syntax:
For customer 1
, we have three events in input, with a 5s delay between them, so we expect four output events for that customer, since 4 is the number of hopping windows involved.
ButChecking the emitted records we see that only two are produced.
This is because by default windowing operations works on the record timestamp, and in our case all the timestamps are pretty much the same, and they coincide with the time the records were inserted.
Fortunately e can change this behavior using the special EVENTTIME BY ...
syntax, specifying an expression to be used as a timestamp:
As you can see, the results have been windowed using event_time
as the timestamp:
This page describes a tutorial joining Kafka topics with Lenses SQL Processors.
magine you are the next Amazon, and you want to track the orders and shipment events to work out which orders have been shipped and how long it took. In this case, there will be two data streams, one for each event type, and the resulting stream will answer the questions above.
Enriching two streams of data requires a sliding window join. The events are said to be “close” to each other, if the difference between their timestamp is up to the time window specified.
Topics involved:
orders
messages contain information about a customer
shipments
messages contain information about the shipment
The query combines the data from orders and shipments if the orders are processed within 24 hours. Resulting records contain the order and shipment identifier, and the time between the order was registered to the time it was shipped.
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
orders
orders
shipments
shipments
The output seen in the next screenshot shows two records. For the order with o2
identifier, there is no shipments
entry because it has not been processed. For the order with identifier o3
, the shipment happened after one day.
Let’s switch to the Snapshot engine by navigating to SQL Studio
menu item. With the entries in both topics, we can write the following query to see which data is joinable without the window interval:
These are the results for the non-streaming query (i.e., Snapshot)
Running the query returned three records. But you can see the order o3
was processed two days after it was placed. Let’s apply the sliding window restriction for the Snapshot query by adding a filter to only match those records having their timestamp difference within a day.
Now the result matches the one from Streaming query.
In this tutorial you learned how to join to Streams together using a sliding window. You achieved all the above using Lenses SQL engine.
Good luck and happy streaming!
This page describes a tutorial to aggregate Kafka topic data into a table using Lenses SQL Processors.
In this tutorial, we will see how data in a table can be aggregated continuously using GROUP BY
and how the aggregated results are emitted downstream.
In Lenses SQL you can read your data as a TABLE
and quickly aggregate over it using the GROUP BY
clause and SELECT TABLE
.
Let’s assume that we have a topic (game-sessions
) containing data regarding remote gaming sessions by users.
Each gaming session will contain:
the points the user achieved throughout the session
Metadata information regarding the session:
The country where the game took place
The language the user played the game in
The above structure represents the value
of each record in our game-sessions
topic.
Additionally, each record will be keyed by user information, including the following:
A pid, or player id, representing this user uniquely
Some additional denormalised user details:
a name
a surname
an age
Putting denormalised data in keys is not something that should be done in a production environment.
In light of the above, a record might look like the following (in JSON for simplicity):
We can replicate such structure using SQL Studio and the following query:
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
Now we can start processing the data we have inserted above.
Let’s imagine that we are told that we want to keep a running count of how many users are in a given country. To do this, we can assume that a user is currently in the same country where his last game took place.
We can achieve the above with the following query:
The content of the output topic, groupby-table-country
, can now be inspected in the Lenses Explore screen and it will look similar to this:
The key results to notice here are the ones for Spain
and the UK
:
Spain
is 2
because Jorge
and Dave
had their last game played there.
UK
is 1
because, while Nigel
had his only game played there, Dave
initially played from the UK
, but then from Italy
and finally from Spain
. Dave
contribution was, therefore, subtracted from the UK
count value.
The last point from above is the main difference (and power) of Tables vs. Streams: they represent the latest state of the world for each of their keys, so any aggregation will apply only on that latest data. If this is not clear enough.
Given what a Table is, it will have by definition only a single value for any given key, so doing GROUP BY _key
on a Table is a pointless operation because it will always only generate 1-element groups.
We can expand on the example from the previous section, imagining that our requirement was extended.
Just as before, we want to calculate statistics based on the current country of a user, as defined in Example 1, but now we want to know all the following:
count how many users are in a given country
what is the total amount of points these users achieved
what is the average amount of points these users achieved
All of the above can be achieved with the following query:
The content of the output topic, groupby-table-country-multi
, can now be inspected in the Lenses Explore screen and it will look similar to this:
One thing to highlight here is that the functions we are using in this query (COUNT
, SUM
, and AVG
) all support aggregating over Tables. However, that is not true of all functions. To find out which functions support Tables and which ones only support Streams.
We will cover one final scenario where we want to filter some data within our aggregation.
There are two possible types of filtering we might want to do when it comes to aggregations:
Pre-aggregation: we want some rows to be ignored by the grouping, so they will not be part of the calculation done by aggregation functions. In these scenarios, we will use the WHERE
clause.
Post-aggregation: we want to filter the aggregation results themselves so that those aggregated records that meet some specified condition are not emitted at all. In these scenarios, we will use the HAVING
clause.
Let’s see an example.
We want to calculate the statistics from Example 2, but grouping by the session language
. Here we will make again the assumption that a user’s language is represented only by his latest recorded game session.
Additionally, we are only interested in languages used by players who don’t achieve a high total of points (we might want to focus our marketing team’s effort there, to keep them entertained). Finally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations if a given user appeared to have played from a given country.
For the sake of this example, we will:
Show statistics for languages with total points lower than 100
Ignore sessions that Dave
made from Spain
(because we know he was not there)
The query for all of the above is:
The content of the output topic, groupby-table-language-filtered
, can now be inspected in the Lenses Explore screen and it will look similar to this:
Notice that IT
(which is the only language that has 120 points in total) appears in the output but without any data in the value
section.
This is because aggregations are Tables, and the key IT
used to be present (while it was lower than 100), but then it was removed. Deletion is expressed, in Tables, by setting the value
section of a record to null
, which is what we are seeing here.
In this tutorial, you learned how to use aggregation over Tables to:
group by arbitrary fields, based on the latest state of the world
calculate multiple results in a single processor
filtering both the data that is to be aggregated and the one that will be emitted as a result of the aggregation itself
Good luck and happy streaming!
This page describes a tutorial to use multiple Kafka topics in a Lenses SQL Processor.
In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.
Let’s assume that we have a topic (game-sessions
) that contains data regarding remote gaming sessions by users.
Each gaming session will contain:
the points the user achieved throughout the session
Metadata information regarding the session:
The country where the game took place
The language the user played the game in
The above structure represents the value
of each record in our game-sessions
topic.
Additionally, each record will is keyed by user details.
A pid, or player id, representing this user uniquely
Some additional denormalised user details:
a name
a surname
an age
In light of the above, a record might look like the following (in JSON for simplicity):
Finally, let’s assume we also have another, normalised, compacted topic user-details
, keyed by an int
matching the pid
from topic game-sessions
and containing user information like address and period of membership to the platform.
In light of the above, a record might look like the following (in JSON for simplicity):
We can replicate such structures using SQL Studio and the following query:
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
Let’s imagine that, given the above data, we are given the following requirements:
For each country in the games-sessions
, create a record with the count of games played in from that country. Write the results to the games-per-country
topic.
For each record in the games-sessions
, reshape the records to remove everything from the key beside pid
. Additionally, add the user’s memberYears
to the value. Write the results to the games-sessions-normalised
topic .
We can obtain the above with the following query:
The result of this processor in the UI will be a processor graph similar to the following:
Finally, the content of the output topics games-per-country
and games-sessions-normalised
can now be inspected in the Lenses Explore screen:
In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.
Good luck and happy streaming!
This page describes a tutorial to perform time windowed aggregations on Kafka topic data with Lenses SQL Processors.
In this tutorial we will see how data in a Stream can be aggregated continuously using GROUP BY
over a time window and the results are emitted downstream.
In Lenses SQL you can read your data as a STREAM
and quickly aggregate over it using the GROUP BY
clause and SELECT STREAM
Let’s assume that we have a topic (game-sessions
) that contains data regarding remote gaming sessions by users.
Each gaming session will contain:
the points the user achieved throughout the session
Metadata information regarding the session:
The country where the game took place
The startAt the date and time the game commenced
The endedAt the date and time the game finished
The above structure represents the value
of each record in our game-sessions
topic.
Additionally, each record will be keyed by user information, including the following:
A pid, or player id, representing this user uniquely
Some additional denormalised user details:
a name
a surname
an age
Keep in mind This is just an example in the context of this tutorial. Putting denormalised data in keys is not something that should be done in a production environment.
In light of the above, a record might look like the following (in json for simplicity):
We can replicate such structure using SQL Studio and the following query:
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
The time a game started and completed is expressed in epoch time. To see the human readable values, run this query:
Now we can start processing the data we have inserted above.
One requirement could be to count how many games each user has played every 10 seconds.
We can achieve the above with the following query:
The content of the output topic, games_per_user_every_10_seconds
, can now be inspected and eventually it will look similar to this:
As you can see, the keys of the records did not change, but their value is the result of the specified aggregation. The gamer Billy Lagrange has two entries because he played 2 games, the first two with a start window between 2020-07-23 17:08:00
and 2020-07-23 17:08:10
(exclusive), and the third entry between 2020-07-23 17:08:10
(inclusive) and 2020-07-23 17:08:20
(exclusive).
You might have noticed that groupby-key
has been created as a compacted topic, and that is by design.
All aggregations result in a Table because they maintain a running, fault-tolerant, state of the aggregation and when the result of an aggregation is written to a topic, then the topic will need to reflect these semantics (which is what a compacted topic does).
We can expand on the example from the previous section. We now want to know, for each country on a 10 seconds interval, the following:
count how many games were played
what are the top best 3 results
All the above can be achieved with the following query:
The content of the output topic, games_per_country_every_10_seconds
, can now be inspected in the SQL Studio screen by running:
There are 2 entries for Italy, since there is one game played at 2020-07-23 18:08:11
. Also, notice for the other entry on Italy, there are 4 occurrences and 3 max points. The reason for 4 occurrence is down to 4 games, two each from Billy Lagrange and Maria Rossi within the 10 seconds time window between 2020-07-23 18:08:00
and 2020-07-23 18:08:10
(exclusive).
In this tutorial you learned how to use aggregation over Streams to:
group by the current key
of a record
group by a field in the input record
use a time window to define the aggregation over.
Good luck and happy streaming!
This page describes a tutorial to aggregate data Kafka topic data into a stream using Lenses SQL Processors
In this tutorial we will see how data in a stream can be aggregated continuously using GROUP BY
and how the aggregated results are emitted downstream.
In Lenses SQL you can read your data as a STREAM
and quickly aggregate over it using the GROUP BY
clause and SELECT STREAM
Let’s assume that we have a topic (game-sessions
) that contains data regarding remote gaming sessions by users.
Each gaming session will contain:
the points the user achieved throughout the session
Metadata information regarding the session:
The country where the game took place
The language the user played the game in
The above structure represents the value
of each record in our game-sessions
topic.
Additionally, each record will be keyed by user information, including the following:
A pid, or player id, representing this user uniquely
Some additional denormalised user details:
a name
a surname
an age
Keep in mind this is just an example in the context of this tutorial. Putting denormalised data in keys is not something that should be done in a production environment.
In light of the above, a record might look like the following (in json for simplicity):
We can replicate such structure using SQL Studio and the following query:
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
Now we can start processing the data we have inserted above.
One requirement could be to count how many games each user has played. Additionally, we want to ensure that, should new data come in, it will update the calculations and return the up to date numbers.
We can achieve the above with the following query:
The content of the output topic, groupby-key
, can now be inspected in the Lenses Explore screen and it will look similar to this:
As you can see, the keys of the records did not change, but their value is the result of the specified aggregation.
You might have noticed that groupby-key
has been created as a compacted topic, and that is by design.
All aggregations result in a Table because they maintain a running, fault-tolerant, state of the aggregation and when the result of an aggregation is written to a topic, then the topic will need to reflect these semantics (which is what a compacted topic does).
We can expand on the example from the previous section. We now want to know, for each user, the following:
count how many games the user has played
what are the user’s best 3 results
what is the user’s average of points
All the above can be achieved with the following query:
The content of the output topic, groupby-key-multi-aggs
, can now be inspected in the Lenses Explore screen, and it will look similar to this:
Our analytics skills are so good that we are now asked for more. We now want to calculate the same statistics as before, but grouping together players that played from the same country and used the same language.
Here is the query for that:
The content of the output topic, groupby-country-and-language
, can now be inspected in the Lenses Explore screen and it will look similar to this:
Notice how we projected sessionMetadata.language as sessionLanguage
in the query.
We could do that because sessionMetadata.language
is part of the GROUP BY
clause.
Lenses SQL only supportsas Full Group By mode, so if the projected field is not part of the GROUP BY
clause, the query will be invalid.
One final scenario we will cover in this tutorial is when we want to filter some data within our aggregation.
There are two possible types of filtering we might want to do, when it comes to aggregations:
Pre-aggregation: we want some rows to be ignored by the grouping, so they will not be part of the calculation done by aggregation functions. In these scenarios we will use the WHERE
clause.
Post-aggregation: we want to filter the aggregation results themselves, so that those aggregated records which meet some specified condition are not emitted at all. In these scenarios we will use the HAVING
clause.
Let’s see an example.
We want calculate the usual statistics from the previous scenarios, but grouping by the session language
only.
However, we are interested only in languages that are used a small amount of times (we might want to focus our marketing team’s effort there); additionally, we are aware that some users have been using VPNs to access our platform, so we want to exclude some records from our calculations, if a given user appeared to have played from a given country.
For the sake of this example, we will:
Show statistics for languages that are used less than 9 times
Ignore sessions that Dave
made from Spain
(because we know he was not there)
The query for all the above is:
The content of the output topic, groupby-language-filtered
, can now be inspected in the Lenses Explore screen and it will look similar to this:
Notice that IT
(which is the only language that has 9 sessions in total) appears in the output but without any data in the value
section.
This is because aggregations are Tables, and the key IT
used to be present (while it was lower than 9), but then it was removed. Deletion is expressed, in Tables, by setting the value
section of a record to null
, which is what we are seeing here.
In this tutorial you learned how to use aggregation over Streams to:
group by the current key
of a record
calculate multiple results in a single processor
group by a combination of different fields of the input record
filtering both the data that is to be aggregated, and the one that will be emitted by the aggregation itself
You achieved all the above using Lenses SQL engine.
You can now proceed to learn about more complex scenarios like aggregation over Tables and windowed aggregations.
Good luck and happy streaming!