anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180883584


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
-    closePrefixScanIterators()
-    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
+    loadedVersion = -1L

Review Comment:
   I don't think we need to do anything explicitly here. Basically the call to 
load the rocksdb version from dfs should ensure that the appropriate cleanup is 
done.
   
   ```
       val metadata = if (version == 0) {
         if (localDir.exists) Utils.deleteRecursively(localDir)
         localDir.mkdirs()
         RocksDBCheckpointMetadata(Seq.empty, 0)
       } else {
         // Delete all non-immutable files in local dir, and unzip new ones 
from DFS commit file
         listRocksDBFiles(localDir)._2.foreach(_.delete())
   ```
   
   If the instance is removed, we would eventually call `close` which would 
delete the base root dir itself.
   
   cc - @HeartSaVioR - in case there is any condition I may be missing here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to