sahnib commented on code in PR #44837:
URL: https://github.com/apache/spark/pull/44837#discussion_r1463548230


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -162,6 +162,8 @@ class RocksDB(
         val metadata = 
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
         loadedVersion = latestSnapshotVersion
 
+        // reset last snapshot version
+        lastSnapshotVersion = 0L

Review Comment:
   We need to reset the `lastSnapshotVersion` to discard any snapshots taken 
which are higher than the loaded version. It is necessary because the new SST 
files downloaded might not be compatible with the existing snapshots. Consider 
the scenario below: 
   
   1. Executor 1 commits v1 and v2. RocksDB snapshots are taken, but only 
changelog files are committed. 
   2. Executor 1 performs maintenance and uploads checkpoint snapshot for v2 on 
DFS. 
   3. Executor 1 performs commits for v3 and v4. RocksDB snapshots are taken, 
but only changelog files are committed. 
   4. Executor 2 starts from v0, commits v1 and v2, performs maintenance and 
overwrites checkpoint snapshot for v2. 
   5. Executor 1 reloads version 2.  At this point, we now download files 
committed by executor 2 in (4). **However, notice that lastSnapshotVersion is 
still 4**.
   6. Executor 1 commits v3 and v4. Changelog files are committed, but no 
RocksDB snapshots are taken because **lastSnapshotVersion >= newVersion** 
   7. Executor 1 uploads Snapshot v4 to DFS as part of maintenance. This will 
result in snapshot taken in (3) uploaded to DFS, with metadata pointing to 
current state as in (6), which are not compatible. 
   
   The testcase `time travel 2 (with changelog checkpointing)` covers this 
scenario. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to