xuanyuanking commented on a change in pull request #32767:
URL: https://github.com/apache/spark/pull/32767#discussion_r644694974



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,125 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.nio.charset.Charset
 
+import scala.language.implicitConversions
+
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class RocksDBSuite extends SparkFunSuite {
 
+  test("RocksDBFileManager: upload only new immutable files") {
+    withTempDir { dir =>
+      val dfsRootDir = dir.getAbsolutePath
+      val verificationDir = Utils.createTempDir().getAbsolutePath // local dir 
to load checkpoints
+      val fileManager = new RocksDBFileManager(
+        dfsRootDir, Utils.createTempDir(), new Configuration)
+      val sstDir = s"$dfsRootDir/SSTs"
+      def numRemoteSSTFiles: Int = listFiles(sstDir).length
+      val logDir = s"$dfsRootDir/logs"
+      def numRemoteLogFiles: Int = listFiles(logDir).length
+
+      // Verify behavior before any saved checkpoints
+      assert(fileManager.getLatestVersion() === 0)
+
+      // Try to load incorrect versions
+      intercept[FileNotFoundException] {
+        fileManager.loadCheckpointFromDfs(1, Utils.createTempDir())
+      }
+
+      // Save a version of checkpoint files
+      val cpFiles1 = Seq(
+        "sst-file1.sst" -> 10,
+        "sst-file2.sst" -> 20,
+        "other-file1" -> 100,
+        "other-file2" -> 200,
+        "archive/00001.log" -> 1000,
+        "archive/00002.log" -> 2000
+      )
+      saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
+      assert(fileManager.getLatestVersion() === 1)
+      assert(numRemoteSSTFiles == 2) // 2 sst files copied
+      assert(numRemoteLogFiles == 2) // 2 log files copied
+
+      // Load back the checkpoint files into another local dir with existing 
files and verify
+      generateFiles(verificationDir, Seq(
+        "sst-file1.sst" -> 11, // files with same exist but different sizes, 
should get overwritten
+        "other-file1" -> 101,
+        "archive/00001.log" -> 1001,
+        "random-sst-file.sst" -> 100, // unnecessary files, should get deleted
+        "random-other-file" -> 9,
+        "00005.log" -> 101,
+        "archive/00007.log" -> 101
+      ))
+      loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, 
cpFiles1, 101)

Review comment:
       Per https://github.com/apache/spark/pull/32582#discussion_r642036286




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to