sahnib commented on code in PR #45092:
URL: https://github.com/apache/spark/pull/45092#discussion_r1489848282


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1863,6 +1864,91 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  test("ensure local files deleted on filesystem" +
+    " are cleaned from dfs file mapping") {
+    def getSSTFiles(dir: File): Set[File] = {
+      val sstFiles = new mutable.HashSet[File]()
+      dir.listFiles().foreach { f =>
+        if (f.isDirectory) {
+          sstFiles ++= getSSTFiles(f)
+        } else {
+          if (f.getName.endsWith(".sst")) {
+            sstFiles.add(f)
+          }
+        }
+      }
+      sstFiles.toSet
+    }
+
+    def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
+      dir.listFiles().foreach { f =>
+        if (f.isDirectory) {
+          filterAndDeleteSSTFiles(f, filesToKeep)
+        } else {
+          if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
+            logInfo(s"deleting ${f.getAbsolutePath} from local directory")
+            f.delete()
+          }
+        }
+      }
+    }
+
+    withTempDir { dir =>
+      withTempDir { localDir =>
+        val sqlConf = new SQLConf()
+        val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+        logInfo(s"config set to ${dbConf.compactOnCommit}")
+        val hadoopConf = new Configuration()
+        val remoteDir = dir.getCanonicalPath
+        withDB(remoteDir = remoteDir,
+          conf = dbConf,
+          hadoopConf = hadoopConf,
+          localDir = localDir) { db =>
+          db.load(0)
+          db.put("a", "1")
+          db.put("b", "1")
+          db.commit()
+          // upload snapshots (with changelog checkpointing)

Review Comment:
   The maintenance operation creates a snapshot only if changelog checkpointing 
is enabled. However, I agree that the comment is confusing though because the 
test runs both with/without changelog checkpointing. Further, state store does 
create snapshots on commit based on minDeltasForSnapshots (with changelog 
checkpointing enabled). I have removed the comment as I think it adds more 
confusion than helps. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to