HeartSaVioR commented on code in PR #39897:
URL: https://github.com/apache/spark/pull/39897#discussion_r1100978171


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -269,29 +306,46 @@ class RocksDBFileManager(
         s"$numVersionsToRetain versions")
 
     // Resolve RocksDB files for all the versions and find the max version 
each file is used
-    val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long]
+    val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
     sortedVersions.foreach { version =>
       val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
         val newResolvedFiles = getImmutableFilesFromVersionZip(version)
         versionToRocksDBFiles.put(version, newResolvedFiles)
         newResolvedFiles
       }
-      files.foreach(f => fileToMaxUsedVersion(f) = version)
+      files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
+        math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, 
version)))
     }
 
     // Best effort attempt to delete SST files that were last used in 
to-be-deleted versions
     val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => 
versionsToDelete.contains(v) }
+
+    val sstDir = new Path(dfsRootDir, 
RocksDBImmutableFile.SST_FILES_DFS_SUBDIR)
+    val logDir = new Path(dfsRootDir, 
RocksDBImmutableFile.LOG_FILES_DFS_SUBDIR)
+    val allSstFiles = if (fm.exists(sstDir)) fm.list(sstDir).toSeq else 
Seq.empty
+    val allLogFiles = if (fm.exists(logDir)) fm.list(logDir).toSeq else 
Seq.empty
+    filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, 
allSstFiles ++ allLogFiles)
+      .map(_ -> -1L)
     logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= 
$minVersionToRetain")
     var failedToDelete = 0
-    filesToDelete.foreach { case (file, maxUsedVersion) =>
+    filesToDelete.foreach { case (dfsFileName, maxUsedVersion) =>
       try {
-        val dfsFile = dfsFilePath(file.dfsFileName)
+        val dfsFile = dfsFilePath(dfsFileName)
         fm.delete(dfsFile)
-        logDebug(s"Deleted file $file that was last used in version 
$maxUsedVersion")
+        if (maxUsedVersion == -1) {
+          logDebug(s"Deleted orphan file $dfsFileName")
+        } else {
+          logDebug(s"Deleted file $dfsFileName that was last used in version 
$maxUsedVersion")
+        }
+        logDebug(s"Deleted file $dfsFileName that was last used in version 
$maxUsedVersion")

Review Comment:
   nit: remove?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to