anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179867179
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
}
def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
- val threadId = Thread.currentThread().getId
- val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {
Review Comment:
Yea so the previous model assumed that the iterator could reflect changes to
writeBatch for the given thread. That is not true with the change any more.
So basically the following would fail, if we were to use cached iterator.
- Load version 0
- Add kv pairs (c, 1) (c, 2) (c, 3)
- commit 0
- Load version 1
- Call prefix scan and load map
- Add kv pairs (c, 4) (c, 5) (c, 6)
- Read key-values and now if we rely only on the old iterator loaded in the
map, we read (c, 1), (c, 2) (c, 3). With the old writeBatch in place, the
iterator was valid for the writeBatch as well which is why we could read all
the kv pairs correctly.
Sample failure:
```
[info] Set((("c", 1), 1), (("c", 2), 2), (("c", 3), 3)) did not equal
Set((("c", 4), 4), (("c", 3), 3), (("c", 1), 1), (("c", 5), 5), (("c", 2), 2),
(("c", 6), 6)) (StateStoreSuite.scala:917)
[info] Analysis:
[info] Set(missingInLeft: [(("c", 4), 4), (("c", 5), 5), (("c", 6), 6)])
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]