xuanyuanking commented on a change in pull request #32582:
URL: https://github.com/apache/spark/pull/32582#discussion_r645672037
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,88 @@ package org.apache.spark.sql.execution.streaming.state
import java.io._
import java.nio.charset.Charset
+import scala.language.implicitConversions
+
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
class RocksDBSuite extends SparkFunSuite {
+ test("RocksDBFileManager: upload only new immutable files") {
+ withTempDir { dir =>
+ val dfsRootDir = dir.getAbsolutePath
+ val fileManager = new RocksDBFileManager(
+ dfsRootDir, Utils.createTempDir(), new Configuration)
+ val sstDir = s"$dfsRootDir/SSTs"
+ def numRemoteSSTFiles: Int = listFiles(sstDir).length
+ val logDir = s"$dfsRootDir/logs"
+ def numRemoteLogFiles: Int = listFiles(logDir).length
+
+ // Save a version of checkpoint files
+ val cpFiles1 = Seq(
+ "sst-file1.sst" -> 10,
+ "sst-file2.sst" -> 20,
+ "other-file1" -> 100,
+ "other-file2" -> 200,
+ "archive/00001.log" -> 1000,
+ "archive/00002.log" -> 2000
+ )
+ saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
+ assert(numRemoteSSTFiles == 2) // 2 sst files copied
+ assert(numRemoteLogFiles == 2) // 2 log files copied
+
+ // Save SAME version again with different checkpoint files and verify
+ val cpFiles1_ = Seq(
+ "sst-file1.sst" -> 10, // same SST file as before, should not get
copied
+ "sst-file2.sst" -> 25, // new SST file with same name as before, but
different length
+ "sst-file3.sst" -> 30, // new SST file
+ "other-file1" -> 100, // same non-SST file as before, should not get
copied
+ "other-file2" -> 210, // new non-SST file with same name as before,
but different length
+ "other-file3" -> 300, // new non-SST file
+ "archive/00001.log" -> 1000, // same log file as before, should not
get copied
+ "archive/00002.log" -> 2500, // new log file with same name as before,
but different length
+ "archive/00003.log" -> 3000 // new log file
+ )
+ saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
+ assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2
old + 2 new SST files
+ assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2
old + 2 new log files
+
+ // Save another version and verify
+ val cpFiles2 = Seq(
+ "sst-file4.sst" -> 40,
+ "other-file4" -> 400,
+ "archive/00004.log" -> 4000
+ )
+ saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
+ assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
+ assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
+ }
+ }
+
+ test("RocksDBFileManager: error writing [version].zip cancels the output
stream") {
+ quietly {
+ val hadoopConf = new Configuration()
+ hadoopConf.set(
+ SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+ classOf[CreateAtomicTestManager].getName)
+ val dfsRootDir = Utils.createTempDir().getAbsolutePath
+ val fileManager = new RocksDBFileManager(dfsRootDir,
Utils.createTempDir(), hadoopConf)
+ val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20,
"other-file1" -> 100)
+ CreateAtomicTestManager.shouldFailInCreateAtomic = true
+ CreateAtomicTestManager.cancelCalledInCreateAtomic = false
Review comment:
That's right. Just want to set it to `false` explicitly here since we
have a `assert(cancelCalledInCreateAtomic)` in the end. Let me delete this one.
Done in bdd9e8e.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]