[ 
https://issues.apache.org/jira/browse/KAFKA-19943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041662#comment-18041662
 ] 

Uladzislau Blok edited comment on KAFKA-19943 at 12/1/25 11:09 AM:
-------------------------------------------------------------------

Thanks for the response. AFAIU 
StreamsConfig#windowstore.changelog.additional.retention.ms is related to 
window store only. In this case we're using just key value store (Created 
manually and accessed from custom Processor). How can we address this issue 
then? Use window store every time as a work around?

off-topic: I have proposed new solution for 
https://issues.apache.org/jira/browse/KAFKA-19430 (last comment). Could you 
please take a look? :)


was (Author: JIRAUSER309258):
Thanks for the response. AFAIA 
StreamsConfig#windowstore.changelog.additional.retention.ms is related to 
window store only. In this case we're using just key value store (Created 
manually and accessed from custom Processor). How can we address this issue 
then? Use window store every time as a work around?

off-topic: I have proposed new solution for 
https://issues.apache.org/jira/browse/KAFKA-19430 (last comment). Could you 
please take a look? :)

> Stale values in State Store after tombstone was compacted
> ---------------------------------------------------------
>
>                 Key: KAFKA-19943
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19943
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.9.1, 4.1.1
>            Reporter: Uladzislau Blok
>            Priority: Major
>
> h3. *Summary*
> When a Kafka Streams application with a local *state store* (backed by 
> RocksDB) restarts after a period exceeding the changelog topic's 
> {*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities 
> "magically" reappearing. This happens because the *tombstones* required to 
> mark these deletions are no longer present in the compacted changelog topic.
> ----
> h3. *Details and Observed Behavior*
> The issue typically occurs in environments without *shared storage* for state 
> stores (like Kubernetes with local volumes) after a *failover* or prolonged 
> shutdown.
>  * *Original Instance:* An entity is processed and subsequently 
> {*}deleted{*}. A *tombstone* (a record with a null value) is written to the 
> state store's compacted changelog topic.
>  * *Downtime/Failover:* The original instance is shut down, and a new 
> instance (or pod) starts after a period longer than the changelog topic's 
> {{{}delete.retention.ms{}}}.
>  * *Tombstone Removal:* Since the tombstone has aged past 
> {{{}delete.retention.ms{}}}, the Kafka broker removes it during log 
> compaction.
>  * *Restart and Rehydration:*
>  ** If *RocksDB files are not present* -> The new instance starts with its 
> own, empty local RocksDB. It begins to *rebuild* its state store by consuming 
> the compacted changelog topic.
>  ** If {*}RocksDB files are present{*}, Kafka Streams starts to rebuild state 
> based on the local checkpoint. This is fine until it encounters entities 
> older than the configured {{delete.retention.ms}}
>  * *The Bug:* The deleted entity's key, while removed from the changelog, may 
> still exist in the local RocksDB of the _old_ (now failed-over) instance. 
> Critically, if the old instance was running a long time ago, the key/value 
> pair might have existed _before_ the deletion. Since the *tombstone* is gone, 
> there is nothing in the changelog to tell the new instance to *delete* that 
> key. From my POV, in this case *local files can't be source of truth*
>  * *Symptom:* The previously deleted entity is unexpectedly revived in the 
> new state store. We observed this because a {*}punctuator{*}, which scans the 
> {*}entire state store{*}, began processing these revived, outdated entities.
>  
> ----
> h3. *Reproduce issue*
> I was able to reproduce an issue, while doing local testing with state store 
> and aggressive compaction config
> Entire changelog topic:
> {code:java}
> /opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server 
> localhost:9092 --topic ks-state-store-issue-1-example-state-store-changelog 
> --property "print.key=true" -- from-beginning
> 10    string10
> 12    string12
> 6    null
> 9    null
> 3    m
> 2    b
> 7    c
> 5    null
> 11    null
> 13    string13
> 4    y
> 10    null
> 3    g
> 2    m
> 7    we
> 4    o
> 7    jh
> 7    yt
> 7    vbx
> 7    kgf
> 7    cbvn {code}
>  There is no entity with key: *1*
> Application logs:
> {code:java}
> 15:29:27.311 
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
> WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13, 
> string13)
> 15:29:27.311 
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1] 
> WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
> 15:29:27.608 
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2] 
> WARN  org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1, 
> n) {code}
> *Read from state store KV: KeyValue(1, n)*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to