[
https://issues.apache.org/jira/browse/KAFKA-19943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18045027#comment-18045027
]
Matthias J. Sax commented on KAFKA-19943:
-----------------------------------------
I understand the problem. It's just seems to be a kinda rare case you have at
hand. And for your case, increasing delete.retention.ms might cause unwanted
overhead.
The question is really, how could we address it (if we can at all...).
>From KS side, we could maybe try to also store a timestamp in the checkpoint
>file, and try to use it to reason about the situation (I assume we could get
>the `delete.retention.ms` config from the broker), and if we find that the
>checkpoint is too old, we pro-actively delete the local RocksDB. But with
>potentially unsynchronized clocks across different nodes (we have seen
>deployment in which the application local clock is basically just a random
>number – I am not taking about clock drift...), this could cause other
>issues...
For this timestamp based approach, users might be able to implement it
themselves too?
One other idea: KS already has a local "background state cleaner thread"
running, which deletes old task directories. Maybe we could just do a simple
fix, and do an initial run directly at startup? Doesn't sounds ideal either
though, as it might be too aggressive, given the current default
`state.cleanup.delay.ms` of just 10 minutes. But we could also increase this
default cleanup delay.
{quote}If we're running KS application in K8S with N VMs without shared drive,
it's possible that application was running on VM#1 and VM#2 -> then for some
time (more than {_}*delete.retention.ms*{_}) on VM#3 and VM#2 -> then once
again VM#1 and VM#2 and it's possible that VM#1 will see zombies
{quote}
This log cleaner thread would also prevent the issue from this example,
assuming that all VM are running all the time – if a task gets migrated off a
VM, after 10 minutes, the corresponding task directory get deleted to free up
disk space.
I guess a cleaner way could be, to let brokers track some "delete
offset/watermark", that we allow clients to query for. And if this "delete
offset/watermark" is larger than the checkpointed offset, we know we need to
delete the local RocksDB and seek-to-beginning.
Any other ideas?
> Stale values in State Store after tombstone was compacted
> ---------------------------------------------------------
>
> Key: KAFKA-19943
> URL: https://issues.apache.org/jira/browse/KAFKA-19943
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.9.1, 4.1.1
> Reporter: Uladzislau Blok
> Priority: Major
>
> h3. *Summary*
> When a Kafka Streams application with a local *state store* (backed by
> RocksDB) restarts after a period exceeding the changelog topic's
> {*}{{delete.retention.ms}}{*}, it can lead to previously deleted entities
> "magically" reappearing. This happens because the *tombstones* required to
> mark these deletions are no longer present in the compacted changelog topic.
> ----
> h3. *Details and Observed Behavior*
> The issue typically occurs in environments without *shared storage* for state
> stores (like Kubernetes with local volumes) after a *failover* or prolonged
> shutdown.
> * *Original Instance:* An entity is processed and subsequently
> {*}deleted{*}. A *tombstone* (a record with a null value) is written to the
> state store's compacted changelog topic.
> * *Downtime/Failover:* The original instance is shut down, and a new
> instance (or pod) starts after a period longer than the changelog topic's
> {{{}delete.retention.ms{}}}.
> * *Tombstone Removal:* Since the tombstone has aged past
> {{{}delete.retention.ms{}}}, the Kafka broker removes it during log
> compaction.
> * *Restart and Rehydration:*
> ** If *RocksDB files are not present* -> The new instance starts with its
> own, empty local RocksDB. It begins to *rebuild* its state store by consuming
> the compacted changelog topic.
> ** If {*}RocksDB files are present{*}, Kafka Streams starts to rebuild state
> based on the local checkpoint. This is fine until it encounters entities
> older than the configured {{delete.retention.ms}}
> * *The Bug:* The deleted entity's key, while removed from the changelog, may
> still exist in the local RocksDB of the _old_ (now failed-over) instance.
> Critically, if the old instance was running a long time ago, the key/value
> pair might have existed _before_ the deletion. Since the *tombstone* is gone,
> there is nothing in the changelog to tell the new instance to *delete* that
> key. From my POV, in this case *local files can't be source of truth*
> * *Symptom:* The previously deleted entity is unexpectedly revived in the
> new state store. We observed this because a {*}punctuator{*}, which scans the
> {*}entire state store{*}, began processing these revived, outdated entities.
>
> ----
> h3. *Reproduce issue*
> I was able to reproduce an issue, while doing local testing with state store
> and aggressive compaction config
> Entire changelog topic:
> {code:java}
> /opt/kafka/bin $ ./kafka-console-consumer.sh --bootstrap-server
> localhost:9092 --topic ks-state-store-issue-1-example-state-store-changelog
> --property "print.key=true" -- from-beginning
> 10 string10
> 12 string12
> 6 null
> 9 null
> 3 m
> 2 b
> 7 c
> 5 null
> 11 null
> 13 string13
> 4 y
> 10 null
> 3 g
> 2 m
> 7 we
> 4 o
> 7 jh
> 7 yt
> 7 vbx
> 7 kgf
> 7 cbvn {code}
> There is no entity with key: *1*
> Application logs:
> {code:java}
> 15:29:27.311
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
> WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(13,
> string13)
> 15:29:27.311
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-1]
> WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(4, o)
> 15:29:27.608
> [ks-state-store-issue-1-473580d9-4588-428b-a01e-8b5a9dbddf56-StreamThread-2]
> WARN org.bloku.SaveAndLogProcessor - Read from state store KV: KeyValue(1,
> n) {code}
> *Read from state store KV: KeyValue(1, n)*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)