xuanyuanking commented on a change in pull request #32582:
URL: https://github.com/apache/spark/pull/32582#discussion_r645672037



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,88 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.nio.charset.Charset
 
+import scala.language.implicitConversions
+
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class RocksDBSuite extends SparkFunSuite {
 
+  test("RocksDBFileManager: upload only new immutable files") {
+    withTempDir { dir =>
+      val dfsRootDir = dir.getAbsolutePath
+      val fileManager = new RocksDBFileManager(
+        dfsRootDir, Utils.createTempDir(), new Configuration)
+      val sstDir = s"$dfsRootDir/SSTs"
+      def numRemoteSSTFiles: Int = listFiles(sstDir).length
+      val logDir = s"$dfsRootDir/logs"
+      def numRemoteLogFiles: Int = listFiles(logDir).length
+
+      // Save a version of checkpoint files
+      val cpFiles1 = Seq(
+        "sst-file1.sst" -> 10,
+        "sst-file2.sst" -> 20,
+        "other-file1" -> 100,
+        "other-file2" -> 200,
+        "archive/00001.log" -> 1000,
+        "archive/00002.log" -> 2000
+      )
+      saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
+      assert(numRemoteSSTFiles == 2) // 2 sst files copied
+      assert(numRemoteLogFiles == 2) // 2 log files copied
+
+      // Save SAME version again with different checkpoint files and verify
+      val cpFiles1_ = Seq(
+        "sst-file1.sst" -> 10, // same SST file as before, should not get 
copied
+        "sst-file2.sst" -> 25, // new SST file with same name as before, but 
different length
+        "sst-file3.sst" -> 30, // new SST file
+        "other-file1" -> 100, // same non-SST file as before, should not get 
copied
+        "other-file2" -> 210, // new non-SST file with same name as before, 
but different length
+        "other-file3" -> 300, // new non-SST file
+        "archive/00001.log" -> 1000, // same log file as before, should not 
get copied
+        "archive/00002.log" -> 2500, // new log file with same name as before, 
but different length
+        "archive/00003.log" -> 3000 // new log file
+      )
+      saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
+      assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 
old + 2 new SST files
+      assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 
old + 2 new log files
+
+      // Save another version and verify
+      val cpFiles2 = Seq(
+        "sst-file4.sst" -> 40,
+        "other-file4" -> 400,
+        "archive/00004.log" -> 4000
+      )
+      saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
+      assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
+      assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
+    }
+  }
+
+  test("RocksDBFileManager: error writing [version].zip cancels the output 
stream") {
+    quietly {
+      val hadoopConf = new Configuration()
+      hadoopConf.set(
+        SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+        classOf[CreateAtomicTestManager].getName)
+      val dfsRootDir = Utils.createTempDir().getAbsolutePath
+      val fileManager = new RocksDBFileManager(dfsRootDir, 
Utils.createTempDir(), hadoopConf)
+      val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20, 
"other-file1" -> 100)
+      CreateAtomicTestManager.shouldFailInCreateAtomic = true
+      CreateAtomicTestManager.cancelCalledInCreateAtomic = false

Review comment:
       That's right. Just want to set it to `false` explicitly here since we 
have a `assert(cancelCalledInCreateAtomic)` in the end. Let me delete this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to