chaoqin-li1123 commented on code in PR #41099:
URL: https://github.com/apache/spark/pull/41099#discussion_r1199516278


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -280,34 +342,34 @@ class RocksDBFileManager(
     val path = new Path(dfsRootDir)
 
     // All versions present in DFS, sorted
-    val sortedVersions = fm.list(path, onlyZipFiles)
+    val sortedSnapshotVersions = fm.list(path, onlyZipFiles)
       .map(_.getPath.getName.stripSuffix(".zip"))
       .map(_.toLong)
       .sorted
 
     // Return if no versions generated yet
-    if (sortedVersions.isEmpty) return
+    if (sortedSnapshotVersions.isEmpty) return
 
     // Find the versions to delete
-    val maxVersionPresent = sortedVersions.last
-    val minVersionPresent = sortedVersions.head
-    val minVersionToRetain =
-      math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
-    val versionsToDelete = sortedVersions.takeWhile(_ < 
minVersionToRetain).toSet[Long]
+    val maxSnapshotVersionPresent = sortedSnapshotVersions.last
+    val minSnapshotVersionPresent = sortedSnapshotVersions.head
+
+    // In order to reconstruct numVersionsToRetain version, retain the latest 
snapshot
+    // that satisfies (version <= maxSnapshotVersionPresent - 
numVersionsToRetain + 1)
+    val minVersionToRetain = sortedSnapshotVersions

Review Comment:
   I changed to use 0 as default value, retain all versions if we don't find 
any version out of range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to