viirya commented on a change in pull request #32582:
URL: https://github.com/apache/spark/pull/32582#discussion_r645325161



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,88 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.nio.charset.Charset
 
+import scala.language.implicitConversions
+
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class RocksDBSuite extends SparkFunSuite {
 
+  test("RocksDBFileManager: upload only new immutable files") {
+    withTempDir { dir =>
+      val dfsRootDir = dir.getAbsolutePath
+      val fileManager = new RocksDBFileManager(
+        dfsRootDir, Utils.createTempDir(), new Configuration)
+      val sstDir = s"$dfsRootDir/SSTs"
+      def numRemoteSSTFiles: Int = listFiles(sstDir).length
+      val logDir = s"$dfsRootDir/logs"
+      def numRemoteLogFiles: Int = listFiles(logDir).length
+
+      // Save a version of checkpoint files
+      val cpFiles1 = Seq(
+        "sst-file1.sst" -> 10,
+        "sst-file2.sst" -> 20,
+        "other-file1" -> 100,
+        "other-file2" -> 200,
+        "archive/00001.log" -> 1000,
+        "archive/00002.log" -> 2000
+      )
+      saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
+      assert(numRemoteSSTFiles == 2) // 2 sst files copied
+      assert(numRemoteLogFiles == 2) // 2 log files copied
+
+      // Save SAME version again with different checkpoint files and verify
+      val cpFiles1_ = Seq(
+        "sst-file1.sst" -> 10, // same SST file as before, should not get 
copied
+        "sst-file2.sst" -> 25, // new SST file with same name as before, but 
different length
+        "sst-file3.sst" -> 30, // new SST file
+        "other-file1" -> 100, // same non-SST file as before, should not get 
copied
+        "other-file2" -> 210, // new non-SST file with same name as before, 
but different length
+        "other-file3" -> 300, // new non-SST file
+        "archive/00001.log" -> 1000, // same log file as before, should not 
get copied
+        "archive/00002.log" -> 2500, // new log file with same name as before, 
but different length
+        "archive/00003.log" -> 3000 // new log file
+      )
+      saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)

Review comment:
       `version = 1`? In practice, will we save checkpoint files for same 
version? I.e. for same micro-batch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to