chaoqin-li1123 commented on code in PR #46942:
URL: https://github.com/apache/spark/pull/46942#discussion_r1645204346


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -874,6 +877,74 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     )
   }
 
+  testWithChangelogCheckpointingEnabled("RocksDBFileManager: deepCopy") {
+    withTempDir { dir =>
+      val dfsRootDir = dir.getAbsolutePath
+      val originalFileManager = new RocksDBFileManager(
+        dfsRootDir, Utils.createTempDir(), new Configuration)
+      val copiedFileManager = originalFileManager.deepCopy()
+
+      // Save a version of checkpoint files
+      val cpFiles = Seq(
+        "001.sst" -> 10,
+        "002.sst" -> 10,
+        "003.sst" -> 10
+      )
+      saveCheckpointFiles(originalFileManager, cpFiles, 1, 101)
+
+      // Ensure checkpoint metrics are different
+      assert(originalFileManager.latestSaveCheckpointMetrics.filesCopied == 3L)
+      assert(originalFileManager.latestSaveCheckpointMetrics.bytesCopied == 
30L)
+      assert(copiedFileManager.latestSaveCheckpointMetrics.filesCopied == 0L)
+      assert(copiedFileManager.latestSaveCheckpointMetrics.bytesCopied == 0L)
+
+      // Checkpoint the same files
+      saveCheckpointFiles(originalFileManager, cpFiles, 2, 101)
+      saveCheckpointFiles(copiedFileManager, cpFiles, 2, 101)
+
+      // Original file manager should skip since files already uploaded
+      assert(originalFileManager.latestSaveCheckpointMetrics.filesCopied == 0L)
+      assert(originalFileManager.latestSaveCheckpointMetrics.bytesCopied == 0L)
+
+      // Copied file manager should not skip since these are new files
+      assert(copiedFileManager.latestSaveCheckpointMetrics.filesCopied == 3L)
+      assert(copiedFileManager.latestSaveCheckpointMetrics.bytesCopied == 30L)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled("RocksDBFileManager: eliminate lock 
contention") {

Review Comment:
   Can we be more specific and name it "background snapshot upload doesn't 
acquire rocksdb instance lock"?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -874,6 +877,74 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     )
   }
 
+  testWithChangelogCheckpointingEnabled("RocksDBFileManager: deepCopy") {
+    withTempDir { dir =>
+      val dfsRootDir = dir.getAbsolutePath
+      val originalFileManager = new RocksDBFileManager(
+        dfsRootDir, Utils.createTempDir(), new Configuration)
+      val copiedFileManager = originalFileManager.deepCopy()
+
+      // Save a version of checkpoint files
+      val cpFiles = Seq(
+        "001.sst" -> 10,
+        "002.sst" -> 10,
+        "003.sst" -> 10
+      )
+      saveCheckpointFiles(originalFileManager, cpFiles, 1, 101)
+
+      // Ensure checkpoint metrics are different
+      assert(originalFileManager.latestSaveCheckpointMetrics.filesCopied == 3L)
+      assert(originalFileManager.latestSaveCheckpointMetrics.bytesCopied == 
30L)
+      assert(copiedFileManager.latestSaveCheckpointMetrics.filesCopied == 0L)
+      assert(copiedFileManager.latestSaveCheckpointMetrics.bytesCopied == 0L)
+
+      // Checkpoint the same files
+      saveCheckpointFiles(originalFileManager, cpFiles, 2, 101)
+      saveCheckpointFiles(copiedFileManager, cpFiles, 2, 101)
+
+      // Original file manager should skip since files already uploaded
+      assert(originalFileManager.latestSaveCheckpointMetrics.filesCopied == 0L)
+      assert(originalFileManager.latestSaveCheckpointMetrics.bytesCopied == 0L)
+
+      // Copied file manager should not skip since these are new files
+      assert(copiedFileManager.latestSaveCheckpointMetrics.filesCopied == 3L)
+      assert(copiedFileManager.latestSaveCheckpointMetrics.bytesCopied == 30L)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled("RocksDBFileManager: eliminate lock 
contention") {
+    // Create a custom ExecutionContext
+    implicit val ec: ExecutionContext = ExecutionContext
+      .fromExecutor(Executors.newSingleThreadExecutor())
+
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(lockAcquireTimeoutMs = 10000, minDeltasForSnapshot 
= 0)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+    withDB(remoteDir, conf = conf) { db =>
+      db.load(0)
+      db.put("0", "0")
+      db.commit()
+
+      // Acquire lock
+      db.load(1)
+      db.put("1", "1")
+
+      // Run doMaintenance in another thread
+      val maintenanceFuture = Future {
+        db.doMaintenance()
+      }
+
+      val timeout = 5.seconds
+
+      // Ensure that maintenance task runs without being blocked by task thread
+      ThreadUtils.awaitResult(maintenanceFuture, timeout)
+

Review Comment:
   Can we check the checkpoint directory content here and verify snapshot is 
indeed uploaded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to