anishshri-db commented on code in PR #40981:
URL: https://github.com/apache/spark/pull/40981#discussion_r1180883584
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -348,9 +340,8 @@ class RocksDB(
* Drop uncommitted changes, and roll back to previous version.
*/
def rollback(): Unit = {
- closePrefixScanIterators()
- resetWriteBatch()
numKeysOnWritingVersion = numKeysOnLoadedVersion
+ loadedVersion = -1L
Review Comment:
I don't think we need to do anything explicitly here. Basically the call to
load the rocksdb version from dfs should ensure that the appropriate cleanup is
done.
```
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
// Delete all non-immutable files in local dir, and unzip new ones
from DFS commit file
listRocksDBFiles(localDir)._2.foreach(_.delete())
```
If the instance is removed, we would eventually call `close` which would
delete the base root dir itself.
cc - @HeartSaVioR - in case there is any condition I may be missing here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]