Multiple topics

In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.

Setting up our example 

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session
  • Metadata information regarding the session:
    • The country where the game took place
    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will is keyed by user details.

  • A pid, or player id, representing this user uniquely
  • Some additional denormalised user details:
    • a name
    • a surname
    • an age

In light of the above, a record might look like the following (in JSON for simplicity):

    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
    "points": 5,
    "sessionMetadata": {
      "country": "Italy",
      "language": "IT"

Finally, let’s assume we also have another, normalised, compacted topic user-details, keyed by an int matching the pid from topic game-sessions and containing user information like address and period of membership to the platform.

In light of the above, a record might look like the following (in JSON for simplicity):

  "key": 1,
    "fullName": "Billy Lagrange",
    "memberYears": 3,
    "address": {
      "country": "Italy",
      "street": "Viale Monza 5",
      "city": "Milan"

We can replicate such structures using SQL Studio and the following query:

CREATE TABLE game-sessions( int
    , string
    , _key.surname string
    , _key.age int
    , points double
    , string
    , sessionMetadata.language string
FORMAT (avro, avro);

CREATE TABLE user-details(
    fullName string
    , memberYears int
    , string
    , address.street string
    , string
) FORMAT (int, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
    , _key.surname
    , _key.age
    , points
    , sessionMetadata.language
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')

INSERT into user-details(
    , fullName
    , memberYears
    , address.street
(1, 'Billy Lagrange', 3, 'Italy', 'Viale Monza 5', 'Milan'),
(2, 'Maria Rossi', 1, 'Italy', 'Stazione Termini', 'Rome'),
(3, 'Jorge Escudero', 5, 'Spain', '50 Passeig de Gracia', 'Barcelona'),
(4, 'Juan Suarez', 0, 'Mexico', 'Plaza Real', 'Tijuana'),
(5, 'John Bolden', 2, 'USA', '100 Wall Street', 'New Work'),
(6, 'Dave Holden', 1, 'UK', '25 Bishopsgate', 'London'),
(7, 'Nigel Calling', 6, 'UK', '10 Queen Anne Street', 'Brighton')

Multiple transformations all in one go 

Let’s imagine that, given the above data, we are given the following requirements:

  • For each country in the games-sessions, create a record with the count of games played in from that country. Write the results to the games-per-country topic.
  • For each record in the games-sessions, reshape the records to remove everything from the key beside pid. Additionally, add the user’s memberYears to the value. Write the results to the games-sessions-normalised topic .

We can obtain the above with the following query:

SET defaults.topic.autocreate=true;

WITH userDetailsTable AS(
  FROM user-details

WITH joinedAndNormalised AS(
    , ud.memberYears
  FROM game-sessions AS gs JOIN userDetailsTable AS ud 
        ON ( = ud._key)

INSERT INTO games-per-country
  COUNT(*) AS gamesPlayed
FROM game-sessions

INSERT INTO games-sessions-normalised
FROM joinedAndNormalised;

The result of this processor in the UI will be a processor graph similar to the following:

Processor graph for the above query

Finally, the content of the output topics games-per-country and games-sessions-normalised can now be inspected in the Lenses Explore screen:

Content of `games-per-country` topic
Content of `games-sessions-normalised` topic


In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.

For more details about the specific operations used within this tutorial, please refer to the Aggregations and Joins sections of the user documentation.

Good luck and happy streaming!

Last modified: July 17, 2024