Update a SQL Processor


In Lenses it is not possible to directly edit a SQL Processor, for example changing the query or changing some of its deployment configuration.

There is a reason behind this: a change in a processor may result in a new processor that is incompatible with the previous one, producing incorrect or duplicated results.

But if you know what we are doing, and you are confident that the change will be a compatible one, there is a way to achieve this.

In this article we will first see how to update a processor, and then we will briefly analyse when it is safe to do so.

Overview of our strategy 

To emulate the editing of a processor, we will create a new one sharing the same Processor ID as the previous one.

The Processor ID will determine the consumer group of the underlying Kafka Streams application.

Sharing the same consumer group, will allow the processor to start exactly from where the previous one left off.

We will now set up a simple example and see, step by step, how to apply this strategy.

Our example 

We will assume we have a simple topic called player_results, storing the result of chess games. Each record will contain information about the outcome of a game for a single player.

The _key is an identifier identifying the result is referring to.

The result can be one of Win, Loss, and Draw.

Create the input topic 

From SQL Studio, create the topic and insert some data into it:

create table player_results(playerId string, result string)
FORMAT(long, protobuf);

INSERT INTO player_results(_key, playerId, result) values
    (0, "Player 1", "Win"),
    (0, "Player 2", "Loss"),
    (1, "Player 1", "Loss"),
    (1, "Player 2", "Win"),
    (2, "Player 1", "Draw"),
    (2, "Player 2", "Draw"),
    (3, "Player 3", "Win"),
    (3, "Player 1", "Loss"),
    (4, "Player 2", "Draw"),
    (4, "Player 1", "Draw");

Here we used PROTOBUF as a serialisation format, but that is not relevant for the sake of this example.

Create the processor 

We want to compute the total number of points for each player, initially following these rules

  • 2 points for a Win
  • 1 point for a Draw
  • 0 points for a Loss

To do that, create the following processor:

SET defaults.topic.autocreate=true;

insert into player_stats
select stream 
    SUM(case
        when result = "Win" then 2
        when result = "Draw" then 1
        else 0
    end) as points
from player_results
group by playerId

Save the processor and start it.

Check the data 

After a while (it may take some time because the processor is stateful), you will see the following results in the output topic (you can check them in SQL Studio):

KEY: "Player 3"
points: 2

KEY: "Player 2"
points: 4

KEY: "Player 1"
points: 4

That is consistent with our data: Player 3 scored a single Win, for a total of 2 points, while Player 1 and Player 2 scored both a win and two draws, for a total of 4 points.

Update the processor 

At some point we decide to change our scoring algorithm, assigning 3 points in case of victory, instead of 2. We decided that the change should not affect historical data, but only new games.

Since we are going to delete the processor, we need first to copy all the information we need from it, in order to be used in the new processor.

1. Stop the existing processor 

We need to stop the processor in order to avoid to compute new data with the old point algorithm. You can stop it clicking on Actions > Stop in the processor page.

2. Copy the Processor ID 

This is a mandatory step, since without Processor ID we will lose the ability to start from where the old processor stopped.

You can find the Processor ID under the Configuration tab of the processor page.

3. Copy other details of the Processor 

Copy any other detail you may need for the creation of the new processor. In our case, we are just going to copy the SQL query.

4. Delete the processor 

Make sure you copied all you need, and delete the processor.

This step is necessary, since Lenses does not allow the creation of multiple processors with the same Processor ID.

5. Create the new processor 

Create a new processor with the following data:

  • Processor ID: the one you copied in one of the previous steps
  • SQL Query: the query of the previous processor, updated to assign 3 points to a victory:
SET defaults.topic.autocreate=true;

insert into player_stats
select stream
    SUM(case
        when result = "Win" then 3
        when result = "Draw" then 1
        else 0
    end) as points
from player_results
group by playerId

Save the processor and start it.

6. Insert some new data 

In SQL Studio, insert the results for a new game between Player 1 and Player 2:

INSERT INTO player_results(gameId, playerId, result) values
(5, "Player 1", "Win"),
(5, "Player 2", "Loss");

7. Verify the data 

After a while, we should see the following data in player_stats:

KEY: "Player 3"
points: 2

KEY: "Player 2"
points: 4

KEY: "Player 1"
points: 4

KEY: "Player 1"
points: 7

KEY: "Player 2"
points: 4

As expected, the points of Player 2 and Player 3 did not change.

This means that the data was not-recomputed from scratch, since in that case, according to the new rules, they should have been assigned 5 points each.

On the other hand, Player 1 scored a victory when the new scoring system was in place, getting three points more, raising its total to 7.

8. Enjoy the new version of the processor 

The new version of the processor has been created, and we verified that it produced the desired results.

Warning 

We saw that updating a processor, although not directly supported by Lenses, is quite an easy task.

But updating a processor is also a very delicate process. For this reason it should be done with the maximal care, only when you are 100% sure that the change is compatible with the previous version.

Particular attention has to go to stateful processors, i.e. processors that make use of one of the following:

  • JOIN
  • GROUP BY
  • SELECT TABLE

In such cases, the processor will internally build names for its state-related internal topics. If something significant is changed in the processor query, those names may change in the new version, causing the loss of the old state.

Safe changes 

Stateless processors: It is safe to update a non-stateful processor, for example adding a WHERE statement. Since the processor is not stateful, no internal topic names are involved.

Stateful processors: In a stateful processor, the only safe change is to replace an expression with another one, like we did in our example, since such a change will not affect the name of internal topics.

Unsafe changes 

Almost everything else is an unsafe change, unless you know what are the consequences of the change and you are OK with that.

Even adding a simple WHERE statement to a stateful processor may cause some internal topics to change name, causing the loss of state described above.

--
Last modified: September 15, 2024