Delete data from compacted topics


In this example, we will show how we can use Lenses to delete records from a compacted topic which stores users calling information based on a different topic that stores the users’ response to a “do you want to be contacted back” survey.

Creating a compacted topic 

With the following code, we can create a compacted topic that holds user call information:

CREATE TABLE user_calls (
    _key.user_id string
  , user.name string
  , user.phone_number string
  , user.address.door string
  , user.address.street string
) 
FORMAT(AVRO,AVRO)
PROPERTIES(
    cleanup.policy=compact
    , min.cleanable.dirty.ratio=0.01
    , segment.ms=100
    , retention.ms=100
    , segment.bytes=400
);

Notice we add cleanup.policy=compact to tell Lenses we want the topic to be compacted. The remaining properties try to force compaction to happen often so that we can easily visualize the result (this should not be used in production though).

We start by adding a some records to our user_info topic:

INSERT INTO user_calls(
    _key.user_id
    , user.name 
    , user.phone_number 
    , user.address.door 
    , user.address.street
    , call.duration
    , call.satisfaction
) VALUES 
("user_1", "John Smith","202-555-0195", "002", "Pratt Avenue", 60, "10"),
("user_2", "Mark Richards","202-245-2345", "765", "East Avenue", 15,"7"),
("user_3", "Timothy Hamilton ","202-333-0195", "002", "Rock Avenue", 12,"5"),
("user_4", "Mark Hamilton ","202-333-0195", "002", "Rock Avenue", 12,"5"),
("user_5", "John Richards ","202-333-0123", "002", "Rock Avenue", 12,"5"),
("user_6", "Timothy Hamilton ","202-333-1295", "002", "Rock Avenue", 12,"5"),
("user_7", "J. Hamilton ","202-444-1195", "002", "Rock Avenue", 12,"5"),
("user_8", "Mark Richards","202-245-2345", "765", "East Avenue", 15,"7"),
("user_9", "Mark Arnold","202-245-2345", "765", "East Avenue", 15,"7")
;

Which we can see by inspecting the topic:

Lenses - Topic data view.

We additionally add a second topic which will hold information regarding users response to the survey:

CREATE TABLE contact_survey(
    _key.request_id string
    , user_id string
    , request_date string
    , response boolean
) 
FORMAT(AVRO,AVRO)

As previously, we add some records to contact_survey

INSERT INTO contact_survey(
    _key.request_id
    , user_id
    , request_date
    , response
) VALUES
("survey01", "user_1", "2020-06-01", false),
("survey02", "user_2", "2020-06-01", true),
("survey03", "user_3", "2020-06-01", true),
("survey04", "user_4", "2020-06-01", true),
("survey05", "user_5", "2020-06-01", false),
("survey06", "user_6", "2020-06-01", false),
("survey07", "user_7", "2020-06-01", false),
("survey08", "user_8", "2020-06-01", false),
("survey09", "user_9", "2020-06-01", false)
;```

### Deleting data using processors

By creating a processor with the following code: 

```sql
INSERT INTO user_calls
SELECT STREAM 
    user_id as _key.user_id 
    , null as _value
FROM contact_survey
WHERE response = false;

We are essentially issuing a delete command for all users who said they didn’t want to be contacted.

Looking at our user_calls topic again, we can see the newly inserted records with a null value, but our original records are still there… How so?

Lenses - Topic data view.

Due to some internals of kafka, log compaction doesn’t always work immediately but in this case, by adding an extra record we can force it to happen:

INSERT INTO user_calls(
    _key.user_id
    , user.name 
    , user.phone_number 
    , user.address.door 
    , user.address.street
    , call.duration
    , call.satisfaction
) VALUES 
("user_10", "John Smith","202-555-0195", "002", "Pratt Avenue", 60, "10")

Looking at the data inside our topic, we can now see that users who responded that they didn’t want to be contacted are no longer part of our topic; the tombstone records (the ones with a null value) will stay around for as long as our retention policy specifies and will eventually be removed, leaving us with a topic with only users that want to be contacted.

Lenses - Topic data view.
--
Last modified: September 15, 2024