Multiple topics


In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.

Setting up our example 

Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.

Each gaming session will contain:

  • the points the user achieved throughout the session
  • Metadata information regarding the session:
    • The country where the game took place
    • The language the user played the game in

The above structure represents the value of each record in our game-sessions topic.

Additionally, each record will is keyed by user details.

  • A pid, or player id, representing this user uniquely
  • Some additional denormalised user details:
    • a name
    • a surname
    • an age

In light of the above, a record might look like the following (in JSON for simplicity):

{
  "key":{
    "pid": 1,
    "name": "Billy",
    "surname": "Lagrange",
    "age": 30
  },
  "value":{
    "points": 5,
    "sessionMetadata": {
      "country": "Italy",
      "language": "IT"
    }
  }
}

Finally, let’s assume we also have another, normalised, compacted topic user-details, keyed by an int matching the pid from topic game-sessions and containing user information like address and period of membership to the platform.

In light of the above, a record might look like the following (in JSON for simplicity):

{
  "key": 1,
  "value":{
    "fullName": "Billy Lagrange",
    "memberYears": 3,
    "address": {
      "country": "Italy",
      "street": "Viale Monza 5",
      "city": "Milan"
    }
  }
}

We can replicate such structures using SQL Studio and the following query:

CREATE TABLE game-sessions(
    _key.pid int
    , _key.name string
    , _key.surname string
    , _key.age int
    , points double
    , sessionMetadata.country string
    , sessionMetadata.language string
)
FORMAT (avro, avro);

CREATE TABLE user-details(
    fullName string
    , memberYears int
    , address.country string
    , address.street string
    , address.city string
) FORMAT (int, avro);

We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:

INSERT into game-sessions(
    _key.pid
    , _key.name
    , _key.surname
    , _key.age
    , points
    , sessionMetadata.country
    , sessionMetadata.language
) VALUES
(1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'),
(2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'),
(7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'),
(2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'),
(3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'),
(4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'),
(5, 'John', 'Bolden', 40, 10, 'USA', 'EN'),
(6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'),
(6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'),
(2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'),
(1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT')
;

INSERT into user-details(
    _key
    , fullName
    , memberYears
    , address.country
    , address.street
    , address.city
) VALUES
(1, 'Billy Lagrange', 3, 'Italy', 'Viale Monza 5', 'Milan'),
(2, 'Maria Rossi', 1, 'Italy', 'Stazione Termini', 'Rome'),
(3, 'Jorge Escudero', 5, 'Spain', '50 Passeig de Gracia', 'Barcelona'),
(4, 'Juan Suarez', 0, 'Mexico', 'Plaza Real', 'Tijuana'),
(5, 'John Bolden', 2, 'USA', '100 Wall Street', 'New Work'),
(6, 'Dave Holden', 1, 'UK', '25 Bishopsgate', 'London'),
(7, 'Nigel Calling', 6, 'UK', '10 Queen Anne Street', 'Brighton')
;

Multiple transformations all in one go 

Let’s imagine that, given the above data, we are given the following requirements:

  • For each country in the games-sessions, create a record with the count of games played in from that country. Write the results to the games-per-country topic.
  • For each record in the games-sessions, reshape the records to remove everything from the key beside pid. Additionally, add the user’s memberYears to the value. Write the results to the games-sessions-normalised topic .

We can obtain the above with the following query:

SET defaults.topic.autocreate=true;
SET commit.interval.ms='1000';

WITH userDetailsTable AS(
  SELECT TABLE *
  FROM user-details
);

WITH joinedAndNormalised AS(
  SELECT STREAM
    gs.*
    , ud.memberYears
  FROM game-sessions AS gs JOIN userDetailsTable AS ud 
        ON (gs._key.pid = ud._key)
);

INSERT INTO games-per-country
SELECT STREAM
  COUNT(*) AS gamesPlayed
FROM game-sessions
GROUP BY sessionMetadata.country;

INSERT INTO games-sessions-normalised
SELECT STREAM *
FROM joinedAndNormalised;

The result of this processor in the UI will be a processor graph similar to the following:

Processor graph for the above query

Finally, the content of the output topics games-per-country and games-sessions-normalised can now be inspected in the Lenses Explore screen:

Content of `games-per-country` topic
Content of `games-sessions-normalised` topic

Conclusion 

In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.

For more details about the specific operations used within this tutorial, please refer to the Aggregations and Joins sections of the user documentation.

Good luck and happy streaming!

--
Last modified: September 15, 2024