anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1179867179


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yea so the previous model assumed that the iterator could reflect changes to 
writeBatch for the given thread. That is not true with the change any more.
   
   So basically the following would fail, if we were to use cached iterator.
   - Load version 0
   - Add kv pairs (c, 1) (c, 2) (c, 3)
   - commit 0
   - Load version 1
   - Call prefix scan and load map
   - Add kv pairs (c, 4) (c, 5) (c, 6)
   - Read key-values and now if we rely only on the old iterator loaded in the 
map, we read (c, 1), (c, 2) (c, 3). With the old writeBatch in place, the 
iterator was valid for the writeBatch as well which is why we could read all 
the kv pairs correctly. With the new change, we wont read the newly added 
values written to the DB.
   
   Sample failure:
   ```
   [info]   Set((("c", 1), 1), (("c", 2), 2), (("c", 3), 3)) did not equal 
Set((("c", 4), 4), (("c", 3), 3), (("c", 1), 1), (("c", 5), 5), (("c", 2), 2), 
(("c", 6), 6)) (StateStoreSuite.scala:917)
   [info]   Analysis:
   [info]   Set(missingInLeft: [(("c", 4), 4), (("c", 5), 5), (("c", 6), 6)])
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -247,14 +253,7 @@ class RocksDB(
   }
 
   def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
-    val threadId = Thread.currentThread().getId
-    val iter = prefixScanReuseIter.computeIfAbsent(threadId, tid => {

Review Comment:
   Yea so the previous model assumed that the iterator could reflect changes to 
writeBatch for the given thread. That is not true with the change any more.
   
   So basically the following would fail, if we were to use cached iterator.
   - Load version 0
   - Add kv pairs (c, 1) (c, 2) (c, 3)
   - commit 0
   - Load version 1
   - Call prefix scan and load map
   - Add kv pairs (c, 4) (c, 5) (c, 6)
   - Read key-values and now if we rely only on the old iterator loaded in the 
map, we read (c, 1), (c, 2) (c, 3). With the old writeBatch in place, the 
iterator was valid for the writeBatch as well which is why we could read all 
the kv pairs correctly. With the new change, we wont read the newly added kv 
pairs written to the DB.
   
   Sample failure:
   ```
   [info]   Set((("c", 1), 1), (("c", 2), 2), (("c", 3), 3)) did not equal 
Set((("c", 4), 4), (("c", 3), 3), (("c", 1), 1), (("c", 5), 5), (("c", 2), 2), 
(("c", 6), 6)) (StateStoreSuite.scala:917)
   [info]   Analysis:
   [info]   Set(missingInLeft: [(("c", 4), 4), (("c", 5), 5), (("c", 6), 6)])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to