sahnib commented on code in PR #45092:
URL: https://github.com/apache/spark/pull/45092#discussion_r1489848282
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1863,6 +1864,91 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ test("ensure local files deleted on filesystem" +
+ " are cleaned from dfs file mapping") {
+ def getSSTFiles(dir: File): Set[File] = {
+ val sstFiles = new mutable.HashSet[File]()
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ sstFiles ++= getSSTFiles(f)
+ } else {
+ if (f.getName.endsWith(".sst")) {
+ sstFiles.add(f)
+ }
+ }
+ }
+ sstFiles.toSet
+ }
+
+ def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ filterAndDeleteSSTFiles(f, filesToKeep)
+ } else {
+ if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
+ logInfo(s"deleting ${f.getAbsolutePath} from local directory")
+ f.delete()
+ }
+ }
+ }
+ }
+
+ withTempDir { dir =>
+ withTempDir { localDir =>
+ val sqlConf = new SQLConf()
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ logInfo(s"config set to ${dbConf.compactOnCommit}")
+ val hadoopConf = new Configuration()
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir = remoteDir,
+ conf = dbConf,
+ hadoopConf = hadoopConf,
+ localDir = localDir) { db =>
+ db.load(0)
+ db.put("a", "1")
+ db.put("b", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
Review Comment:
The maintenance operation creates a snapshot only if changelog checkpointing
is enabled. However, I agree that the comment is confusing though because the
test runs both with/without changelog checkpointing. Further, state store does
create snapshots on commit based on minDeltasForSnapshots (with changelog
checkpointing enabled). I have removed the comment as I think it adds more
confusion than helps.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]