[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() with key deletion in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825529#comment-17825529 ] Jinyong Choi commented on KAFKA-15302: -- [https://github.com/apache/kafka/pull/15495] We have updated the document for users who use it similarly. > Stale value returned when using store.all() with key deletion in punctuation > function. > -- > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.Nam
[jira] [Updated] (KAFKA-15302) Stale value returned when using store.all() with key deletion in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jinyong Choi updated KAFKA-15302: - Summary: Stale value returned when using store.all() with key deletion in punctuation function. (was: Stale value returned when using store.all() in punctuation function.) > Stale value returned when using store.all() with key deletion in punctuation > function. > -- > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820083#comment-17820083 ] Jinyong Choi commented on KAFKA-15302: -- [~cervinka] Hi. It seems like the patch for this issue will take a while, so I will try updating the document. and Yes, But this issue pertains to the use of caching and storage. Even if we switch to an in-memory store, the same situation may arise when utilizing caching functionality. Conversely, if we opt not to use caching, we lose the ability to leverage one of its key features, the record compact functionality. I think this needs to be taken into consideration. I've applied some modifications of NamedCache based on a few of the approaches mentioned earlier and am currently using them for performance. Thank you for your attention to this issue. > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756245#comment-17756245 ] Jinyong Choi commented on KAFKA-15302: -- I got it. > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() > TreeSet size():325771 > ... > 01:05:07.052 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e
[jira] [Comment Edited] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753490#comment-17753490 ] Jinyong Choi edited comment on KAFKA-15302 at 8/12/23 4:22 PM: --- [~mjsax] [~guozhang] I had a new idea. The essence of this issue is that when iterating through store.all(), a problem arises with the delete() operation, where the fresh value of the Cache is deleted. As we begin iterating through store.all(), let's assume that it initially returns (Key: 1, Value: 2). {code:java} (max caches: 3) Cache Store.all() (Snaped) Store LRU Head (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value: 2) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} Following the previous test code, when store.delete() is performed to delete Key 1, the following changes occur. {code:java} Cache Store.all() (Snaped) Store LRU Head (Key : 1, Value: null) (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value: 2) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} Inside the delete() function, the evict() is executed, which checks the cache size and removes the tail due to LRU as shown below. (Key:4 deleted) {code:java} Cache Store.all() (Snaped) Store LRU Head (Key : 1, Value: null) (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} With the aforementioned process, when next() eventually reaches Key 4's position, the Cache doesn't have that key. As a result, the value 1 from the Snaped Store is returned, causing an error situation. Therefore, here are the following suggestions: (It doesn't seem to use the cache competitively in situations where delete() is executed.) So. 1. What if we add the deleted item to the Tail of LRUCache, rather than the Head, when performing delete()? 2. Alternatively, a combination of the first suggestion: during RocksDB Iteration, add the item only to the Tail when delete() is executed. I think this fix is good because it has fewer side effects. (memory.. flush) Could you please provide your opinion on this idea? (1 or 2 or ?) If the idea sounds good, once you confirm, I'll proceed to show you the modified code for streams for PR. was (Author: JIRAUSER301376): [~mjsax] [~guozhang] I had a new idea. The essence of this issue is that when iterating through store.all(), a problem arises with the delete() operation, where the fresh value of the Cache is deleted. As we begin iterating through store.all(), let's assume that it initially returns (Key: 1, Value: 2). {code:java} (max caches: 3) Cache Snaped(Store) Store LRU Head (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value: 2) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} Following the previous test code, when store.delete() is performed to delete Key 1, the following changes occur. {code:java} Cache Store(Snaped) Store(Fresh) LRU Head (Key : 1, Value: null) (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value: 2) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} Inside the delete() function, the evict() is executed, which checks the cache size and removes the tail due to LRU as shown below. (Key:4 deleted) {code:java} Cache Store(Snaped) Store(Fresh) LRU Head (Key : 1, Value: null) (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} With the aforementioned process, when next() eventually reaches Key 4's position, the Cache doesn't have that key. As a result, the value 1 from the Snaped Store is returned, causing an error situation. Therefore, here are the following suggestions: (It doesn't seem to use the cache competitively in situa
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17753490#comment-17753490 ] Jinyong Choi commented on KAFKA-15302: -- [~mjsax] [~guozhang] I had a new idea. The essence of this issue is that when iterating through store.all(), a problem arises with the delete() operation, where the fresh value of the Cache is deleted. As we begin iterating through store.all(), let's assume that it initially returns (Key: 1, Value: 2). {code:java} (max caches: 3) Cache Snaped(Store) Store LRU Head (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value: 2) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} Following the previous test code, when store.delete() is performed to delete Key 1, the following changes occur. {code:java} Cache Store(Snaped) Store(Fresh) LRU Head (Key : 1, Value: null) (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value: 2) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} Inside the delete() function, the evict() is executed, which checks the cache size and removes the tail due to LRU as shown below. (Key:4 deleted) {code:java} Cache Store(Snaped) Store(Fresh) LRU Head (Key : 1, Value: null) (Key : 1, Value: 2) (Key : 2, Value: 2) (Key : 2, Value: 1) (Key : 3, Value: 2) (Key : 3, Value 1) (Key : 3, Value: 1) (Key : 4, Value 1) (Key : 4, Value: 1) LRU Tail {code} With the aforementioned process, when next() eventually reaches Key 4's position, the Cache doesn't have that key. As a result, the value 1 from the Snaped Store is returned, causing an error situation. Therefore, here are the following suggestions: (It doesn't seem to use the cache competitively in situations where delete() is executed.) So. 1. What if we add the deleted item to the Tail of LRUCache, rather than the Head, when performing delete()? 2. Alternatively, a combination of the first suggestion: during RocksDB Iteration, add the item only to the Tail when delete() is executed. I think this fix is good because it has fewer side effects. (memory.. flush) Could you please provide your opinion on this idea? (1 or 2 or ?) If the idea sounds good, once you confirm, I'll proceed to show you the modified code for streams for PR. > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " b
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752451#comment-17752451 ] Jinyong Choi commented on KAFKA-15302: -- [~mjsax] , [~guozhang] If you perform a delete() within a while() loop, it seems that due to the interactions of maybeEvict()->flush(), the value of a key that hasn't been traversed yet might return as stale data. Therefore, I consider this to be a bug. The main changes are as follows: I've made modifications to give a hint to the cache, determining whether to call maybeEvict(), by checking the current state of the RocksDB KeyValueStore, particularly when it's in a snapshot state. Please refer to the code snippet below for a complete view of the changes.(I haven't modified the test code.) [https://github.com/apache/kafka/compare/trunk...jinyongchoi:kafka:KAFKA-15302-testing] {code:java} streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ... @Override public boolean isEvictionInvocationViable() { return openIterators.isEmpty(); } streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ... private void putInternal(final Bytes key, final byte[] value) { context.cache().put( cacheName, key, new LRUCacheEntry( value, context.headers(), true, context.offset(), context.timestamp(), context.partition(), context.topic()), wrapped().isEvictionInvocationViable()); StoreQueryUtils.updatePosition(position, context); } streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ... public void put(final String namespace, final Bytes key, final LRUCacheEntry value, final boolean isEvictionViable) { numPuts++; final NamedCache cache = getOrCreateCache(namespace); synchronized (cache) { final long oldSize = cache.sizeInBytes(); cache.put(key, value); sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize); if (isEvictionViable) { maybeEvict(namespace, cache); } } } {code} After the modifications, the following thoughts arise: 1. It seems unnecessary to perform delete operations during traversal for SessionStore or TimestampedKeyValueStore, but this aspect needs documentation. 2. It functions as expected, but the code doesn't seem to be very clean. 3. Since flush() is suppressed during the while() loop, many keys are stored in the Cache. However, as their values are null, it appears there isn't a significant memory burden. Still, caution is warranted. 4. Due to the inhibition of flush() during the while() loop, a subsequent flush operation, as shown below, took 10 seconds. While cases requiring the processing of 5,000,000 items at once are unlikely, this aspect also demands attention. {code:java} 21:26:17.509 [...-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush start dirtyKeys.size():500 entries:500 21:26:26.874 [...-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush end dirtyKeys.size():0 entries:500{code} 5. If it takes time to correct it in the right direction, it might be a good idea to document this in advance to aid developers' understanding. I'm not coming up with any better ideas. If it takes time to make the correct modifications, I agree that we should update the documentation first. > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any error
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751422#comment-17751422 ] Jinyong Choi commented on KAFKA-15302: -- [~guozhang] Hi. Sure. It is my test code. As you mentioned, {{this.kvStore.delete(entry.key);}} is called within the context of put() and evict() being invoked. [https://github.com/jinyongchoi/kafka-streams-multi-runner/blob/main/worker/src/main/java/Worker.java] {code:java} static class WordCountProcessor implements Processor { private KeyValueStore kvStore; private ProcessorContext context; final String instanceId = System.getenv("INSTANCE_ID"); @Override public void init(final ProcessorContext context) { this.context = context; kvStore = context.getStateStore("Counts"); this.context.schedule( Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, this::forwardAll); } @Override public void process(final Record record) { final Integer recordValue = record.value(); final Integer oldInt = kvStore.get(record.key()); final int old = Objects.requireNonNullElse(oldInt, recordValue - 1); kvStore.put(record.key(), old + 1); } private void forwardAll(final long timestamp) { System.err.println("forwardAll Start"); System.out.println("forwardAll Start"); KeyValueIterator kvList = this.kvStore.all(); while (kvList.hasNext()) { KeyValue entry = kvList.next(); final Record msg = new Record<>(entry.key, entry.value, context.currentSystemTimeMs()); final Integer storeValue = this.kvStore.get(entry.key); if (entry.value != storeValue) { System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " but KeyValueIterator value: " + entry.value); System.out.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " but KeyValueIterator value: " + entry.value); throw new RuntimeException("Broken!"); } this.context.forward(msg); System.out.println("delete key start: " + entry.key); // delete() method call the maybeEvict() function. this.kvStore.delete(entry.key); System.out.println("delete key end : " + entry.key); } kvList.close(); System.err.println("forwardAll end"); System.out.println("forwardAll end"); } @Override public void close() { // close any resources managed by this processor // Note: Do not close any StateStores as these are managed by the library } } {code} > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in sto
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751323#comment-17751323 ] Jinyong Choi commented on KAFKA-15302: -- Hi Matthias J. Sax, Reading your comment helped me to be more specific about this bug. For instance, when using this.context.forward(msg) to forward a message, to optimize storage efficiency, the key of the forwarded message can also be deleted from the store. So, If we call store.delete(key);, the delete() function of CachingKeyValueStore could invoke the getInternal() and putInternal() functions, and following these function calls could lead to the execution of maybeEvict(). Deleting by entering a null value is actually a valid approach to removing items from RocksDB. Therefore, the observed behavior is normal. So, I'm currently writing code to suppress the MaybeEvict() operation. When the test is complete, i will share the results. Let me know if you have any additional comments! {code:java} # CachingKeyValueStore @Override public byte[] delete(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); lock.writeLock().lock(); try { validateStoreOpen(); return deleteInternal(key); } finally { lock.writeLock().unlock(); } } private byte[] deleteInternal(final Bytes key) { final byte[] v = getInternal(key); putInternal(key, null); return v; } private void putInternal(final Bytes key, final byte[] value) { context.cache().put( cacheName, key, new LRUCacheEntry( value, context.headers(), true, context.offset(), context.timestamp(), context.partition(), context.topic())); StoreQueryUtils.updatePosition(position, context); } # NamedCache public void put(final String namespace, final Bytes key, final LRUCacheEntry value, final boolean needToEvict) { numPuts++; final NamedCache cache = getOrCreateCache(namespace); synchronized (cache) { final long oldSize = cache.sizeInBytes(); cache.put(key, value); sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize); maybeEvict(namespace, cache); } } {code} > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
[jira] [Updated] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jinyong Choi updated KAFKA-15302: - Description: When using the store.all() function within the Punctuation function of this.context.schedule, the previous value is returned. In other words, even though the value has been stored from 1 to 2, it doesn't return 2; instead, it returns 1. In the provided test code, you can see the output 'BROKEN !!!', and while this doesn't occur 100% of the time, by adding logs, it's evident that during the while loop after all() is called, the cache is flushed. As a result, the named cache holds a null value, causing the return of a value from RocksDB. This is observed as the value after the .get() call is different from the expected value. This is possibly due to the consistent read functionality of RocksDB, although the exact cause is not certain. Of course, if you perform {{store.flush()}} before {{all()}} there won't be any errors. * test code (forked from balajirrao and modified for this) [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] {code:java} private void forwardAll(final long timestamp) { // System.err.println("forwardAll Start"); KeyValueIterator kvList = this.kvStore.all(); while (kvList.hasNext()) { KeyValue entry = kvList.next(); final Record msg = new Record<>(entry.key, entry.value, context.currentSystemTimeMs()); final Integer storeValue = this.kvStore.get(entry.key); if (entry.value != storeValue) { System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " but KeyValueIterator value: " + entry.value); throw new RuntimeException("Broken!"); } this.context.forward(msg); } kvList.close(); } {code} * log file (add log in stream source) {code:java} # console log sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) ... [info] running Coordinator 1 appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 [0] starting instance +1 forwardAll Start [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 # log file ... 01:05:00.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 01:05:00.388 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():7873 entries:7873 01:05:00.434 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] stream-task [0_0] Flushed cache or buffer Counts ... 01:05:00.587 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator all() 01:05:00.588 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all 01:05:00.590 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.ThreadCache -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] MemoryLRUCacheBytesIterator cache all() 01:05:00.591 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() size():325771 01:05:00.637 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() TreeSet size():325771 ... 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- 0_0-Counts evict() isDirty() eldest.size():103 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 01:05:07.053 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():3459 entries:3460 <= NamedCache.flush() ... ThreadCache set nextEntry is null key:
[jira] [Created] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
Jinyong Choi created KAFKA-15302: Summary: Stale value returned when using store.all() in punctuation function. Key: KAFKA-15302 URL: https://issues.apache.org/jira/browse/KAFKA-15302 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.5.1 Reporter: Jinyong Choi When using the store.all() function within the Punctuation function of this.context.schedule, the previous value is returned. In other words, even though the value has been stored from 1 to 2, it doesn't return 2; instead, it returns 1. In the provided test code, you can see the output 'BROKEN !!!', and while this doesn't occur 100% of the time, by adding logs, it's evident that during the while loop after all() is called, the cache is flushed. As a result, the named cache holds a null value, causing the return of a value from RocksDB. This is observed as the value after the .get() call is different from the expected value. This is possibly due to the consistent read functionality of RocksDB, although the exact cause is not certain. Of course, if you perform {{store.flush()}} before {{all()}} there won't be any errors. * test code (forked from balajirrao and modified for this) [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] {code:java} private void forwardAll(final long timestamp) { // System.err.println("forwardAll Start"); KeyValueIterator kvList = this.kvStore.all(); while (kvList.hasNext()) { KeyValue entry = kvList.next(); final Record msg = new Record<>(entry.key, entry.value, context.currentSystemTimeMs()); final Integer storeValue = this.kvStore.get(entry.key); if (entry.value != storeValue) { System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + " but KeyValueIterator value: " + entry.value); throw new RuntimeException("Broken!"); } this.context.forward(msg); } kvList.close(); } {code} * log file (add log in stream source) {code:java} # console log sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) ... [info] running Coordinator 1 appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 [0] starting instance +1 forwardAll Start [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1 # log file ... 01:05:00.382 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 01:05:00.388 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush dirtyKeys.size():7873 entries:7873 01:05:00.434 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] stream-task [0_0] Flushed cache or buffer Counts ... 01:05:00.587 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator all() 01:05:00.588 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all 01:05:00.590 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.ThreadCache -- stream-thread [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] MemoryLRUCacheBytesIterator cache all() 01:05:00.591 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() size():325771 01:05:00.637 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- NamedCache keySetIterator() TreeSet size():325771 ... 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- ===0_0-Counts evict() isDirty() eldest.size():103 01:05:07.052 [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on flush: #hits=5636398, #misses=6233857, #overwrites=639857, #flushes=402 01:05:07.053 [95108c48-7c69-4eeb-adbd-