HeartSaVioR commented on code in PR #45092:
URL: https://github.com/apache/spark/pull/45092#discussion_r1488831105
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1863,6 +1864,91 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ test("ensure local files deleted on filesystem" +
+ " are cleaned from dfs file mapping") {
+ def getSSTFiles(dir: File): Set[File] = {
+ val sstFiles = new mutable.HashSet[File]()
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ sstFiles ++= getSSTFiles(f)
+ } else {
+ if (f.getName.endsWith(".sst")) {
+ sstFiles.add(f)
+ }
+ }
+ }
+ sstFiles.toSet
+ }
+
+ def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ filterAndDeleteSSTFiles(f, filesToKeep)
+ } else {
+ if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
+ logInfo(s"deleting ${f.getAbsolutePath} from local directory")
+ f.delete()
+ }
+ }
+ }
+ }
+
+ withTempDir { dir =>
+ withTempDir { localDir =>
+ val sqlConf = new SQLConf()
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ logInfo(s"config set to ${dbConf.compactOnCommit}")
+ val hadoopConf = new Configuration()
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir = remoteDir,
+ conf = dbConf,
+ hadoopConf = hadoopConf,
+ localDir = localDir) { db =>
+ db.load(0)
+ db.put("a", "1")
+ db.put("b", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
Review Comment:
or just remove mentioning the part `with changelog checkpointing`.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1863,6 +1864,91 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ test("ensure local files deleted on filesystem" +
+ " are cleaned from dfs file mapping") {
+ def getSSTFiles(dir: File): Set[File] = {
+ val sstFiles = new mutable.HashSet[File]()
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ sstFiles ++= getSSTFiles(f)
+ } else {
+ if (f.getName.endsWith(".sst")) {
+ sstFiles.add(f)
+ }
+ }
+ }
+ sstFiles.toSet
+ }
+
+ def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ filterAndDeleteSSTFiles(f, filesToKeep)
+ } else {
+ if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
+ logInfo(s"deleting ${f.getAbsolutePath} from local directory")
+ f.delete()
+ }
+ }
+ }
+ }
+
+ withTempDir { dir =>
+ withTempDir { localDir =>
+ val sqlConf = new SQLConf()
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ logInfo(s"config set to ${dbConf.compactOnCommit}")
+ val hadoopConf = new Configuration()
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir = remoteDir,
+ conf = dbConf,
+ hadoopConf = hadoopConf,
+ localDir = localDir) { db =>
+ db.load(0)
+ db.put("a", "1")
+ db.put("b", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
Review Comment:
nit: with/without - I expect this to be running with both changelog
checkpointing on and off.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1863,6 +1864,91 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ test("ensure local files deleted on filesystem" +
+ " are cleaned from dfs file mapping") {
+ def getSSTFiles(dir: File): Set[File] = {
+ val sstFiles = new mutable.HashSet[File]()
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ sstFiles ++= getSSTFiles(f)
+ } else {
+ if (f.getName.endsWith(".sst")) {
+ sstFiles.add(f)
+ }
+ }
+ }
+ sstFiles.toSet
+ }
+
+ def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ filterAndDeleteSSTFiles(f, filesToKeep)
+ } else {
+ if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
+ logInfo(s"deleting ${f.getAbsolutePath} from local directory")
+ f.delete()
+ }
+ }
+ }
+ }
+
+ withTempDir { dir =>
+ withTempDir { localDir =>
+ val sqlConf = new SQLConf()
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ logInfo(s"config set to ${dbConf.compactOnCommit}")
+ val hadoopConf = new Configuration()
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir = remoteDir,
+ conf = dbConf,
+ hadoopConf = hadoopConf,
+ localDir = localDir) { db =>
+ db.load(0)
+ db.put("a", "1")
+ db.put("b", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
+ db.doMaintenance()
+
+ // find all SST files written in version 1
+ val sstFiles = getSSTFiles(localDir)
+
+ // make more commits, this would generate more SST files and write
+ // them to remoteDir
+ for (version <- 1 to 10) {
+ db.load(version)
+ db.put("c", "1")
+ db.put("d", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
+ db.doMaintenance()
+ }
+
+ // clean the SST files committed after version 1 from local
+ // filesystem. This is similar to what a process like compaction
+ // where multiple L0 SST files can be merged into a single L1 file
+ filterAndDeleteSSTFiles(localDir, sstFiles)
+
+ // reload 2, and overwrite commit for version 3, this should not
+ // reuse any locally deleted files as they should be removed from
the mapping
+ db.load(2)
+ db.put("e", "1")
+ db.put("f", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
Review Comment:
ditto
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1863,6 +1864,91 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ test("ensure local files deleted on filesystem" +
+ " are cleaned from dfs file mapping") {
+ def getSSTFiles(dir: File): Set[File] = {
+ val sstFiles = new mutable.HashSet[File]()
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ sstFiles ++= getSSTFiles(f)
+ } else {
+ if (f.getName.endsWith(".sst")) {
+ sstFiles.add(f)
+ }
+ }
+ }
+ sstFiles.toSet
+ }
+
+ def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
+ dir.listFiles().foreach { f =>
+ if (f.isDirectory) {
+ filterAndDeleteSSTFiles(f, filesToKeep)
+ } else {
+ if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
+ logInfo(s"deleting ${f.getAbsolutePath} from local directory")
+ f.delete()
+ }
+ }
+ }
+ }
+
+ withTempDir { dir =>
+ withTempDir { localDir =>
+ val sqlConf = new SQLConf()
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ logInfo(s"config set to ${dbConf.compactOnCommit}")
+ val hadoopConf = new Configuration()
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir = remoteDir,
+ conf = dbConf,
+ hadoopConf = hadoopConf,
+ localDir = localDir) { db =>
+ db.load(0)
+ db.put("a", "1")
+ db.put("b", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
+ db.doMaintenance()
+
+ // find all SST files written in version 1
+ val sstFiles = getSSTFiles(localDir)
+
+ // make more commits, this would generate more SST files and write
+ // them to remoteDir
+ for (version <- 1 to 10) {
+ db.load(version)
+ db.put("c", "1")
+ db.put("d", "1")
+ db.commit()
+ // upload snapshots (with changelog checkpointing)
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]