xuanyuanking commented on a change in pull request #32582:
URL: https://github.com/apache/spark/pull/32582#discussion_r645681881



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,88 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.nio.charset.Charset
 
+import scala.language.implicitConversions
+
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class RocksDBSuite extends SparkFunSuite {
 
+  test("RocksDBFileManager: upload only new immutable files") {
+    withTempDir { dir =>
+      val dfsRootDir = dir.getAbsolutePath
+      val fileManager = new RocksDBFileManager(
+        dfsRootDir, Utils.createTempDir(), new Configuration)
+      val sstDir = s"$dfsRootDir/SSTs"
+      def numRemoteSSTFiles: Int = listFiles(sstDir).length
+      val logDir = s"$dfsRootDir/logs"
+      def numRemoteLogFiles: Int = listFiles(logDir).length
+
+      // Save a version of checkpoint files
+      val cpFiles1 = Seq(
+        "sst-file1.sst" -> 10,
+        "sst-file2.sst" -> 20,
+        "other-file1" -> 100,
+        "other-file2" -> 200,
+        "archive/00001.log" -> 1000,
+        "archive/00002.log" -> 2000
+      )
+      saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
+      assert(numRemoteSSTFiles == 2) // 2 sst files copied
+      assert(numRemoteLogFiles == 2) // 2 log files copied
+
+      // Save SAME version again with different checkpoint files and verify
+      val cpFiles1_ = Seq(
+        "sst-file1.sst" -> 10, // same SST file as before, should not get 
copied
+        "sst-file2.sst" -> 25, // new SST file with same name as before, but 
different length
+        "sst-file3.sst" -> 30, // new SST file
+        "other-file1" -> 100, // same non-SST file as before, should not get 
copied
+        "other-file2" -> 210, // new non-SST file with same name as before, 
but different length
+        "other-file3" -> 300, // new non-SST file
+        "archive/00001.log" -> 1000, // same log file as before, should not 
get copied
+        "archive/00002.log" -> 2500, // new log file with same name as before, 
but different length
+        "archive/00003.log" -> 3000 // new log file
+      )
+      saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
+      assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 
old + 2 new SST files
+      assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 
old + 2 new log files
+
+      // Save another version and verify
+      val cpFiles2 = Seq(
+        "sst-file4.sst" -> 40,
+        "other-file4" -> 400,
+        "archive/00004.log" -> 4000
+      )
+      saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
+      assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
+      assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
+    }
+  }
+
+  test("RocksDBFileManager: error writing [version].zip cancels the output 
stream") {
+    quietly {
+      val hadoopConf = new Configuration()
+      hadoopConf.set(
+        SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+        classOf[CreateAtomicTestManager].getName)
+      val dfsRootDir = Utils.createTempDir().getAbsolutePath
+      val fileManager = new RocksDBFileManager(dfsRootDir, 
Utils.createTempDir(), hadoopConf)
+      val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20, 
"other-file1" -> 100)
+      CreateAtomicTestManager.shouldFailInCreateAtomic = true
+      CreateAtomicTestManager.cancelCalledInCreateAtomic = false
+      intercept[IOException] {
+        saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = 101)
+      }

Review comment:
       This is failed on purpose. 
   ```
   [info]   java.io.IOException: Copy failed intentionally
   [info]   at 
org.apache.spark.sql.execution.streaming.CreateAtomicTestManager$$anon$3.close(CheckpointFileManagerSuite.scala:169)
   ```
   
   This test aims to test the behavior that the `cancel` should be called no 
matter any error happens during `close`. See the comment and code here: 
https://github.com/apache/spark/pull/32582/files#diff-e3d3914d0398d61fdd299b1f8d3e869ec6a86e97606677c724969e421c9bf44eR222-R227




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to