micheal-o commented on code in PR #47875:
URL: https://github.com/apache/spark/pull/47875#discussion_r1803891930


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2201,6 +2246,97 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("load the same version of pending 
snapshot uploading") {
+    // The test was accidentally fixed by SPARK-48931 
(https://github.com/apache/spark/pull/47393)
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      db.load(0)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.load(1)
+      db.put("foo", "bar")
+      db.commit()
+      db.doMaintenance()
+
+      db.load(1)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.load(2)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.load(2)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.doMaintenance()
+
+      db.load(2)
+      db.put("foo", "bar")
+      db.commit()
+    }
+  }
+
+  for (random_seed <- 1 to 16) {
+    testWithChangelogCheckpointingEnabled(s"randomized snapshotting 
$random_seed") {
+      // The unit test simulates the case where batches can be reloaded and 
maintenance tasks
+      // can be delayed. After each batch, we randomly decide whether we would 
move onto the
+      // next batch, and whetehr maintenance task is executed.
+      // The test was accidentally fixed by SPARK-48931 
(https://github.com/apache/spark/pull/47393)
+      val remoteDir = Utils.createTempDir().toString
+      val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false)
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
+      withDB(remoteDir, conf = conf) { db =>
+        val random = new Random(random_seed)
+        var curVer: Int = 0
+        for (i <- 1 to 100) {
+          db.load(curVer)
+          db.put("foo", "bar")
+          db.commit()
+          // For a one in five chance, maintenance task is executed. The 
chance is created to
+          // simulate the case where snapshot isn't immediately uploaded, and 
even delayed
+          // so that the next snapshot is ready. We create a snapshot in every 
3 batches, so
+          // with 1/5 chance, it is more likely to create longer maintenance 
delay.
+          if (random.nextInt(5) == 0) {
+            db.doMaintenance()
+          }
+          // For half the chance, we move to the next version, and half the 
chance we keep the
+          // same version
+          if (random.nextInt(2) == 0) {
+            curVer = curVer + 1
+          }
+        }
+      }
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(s"simulate ForEachBatch") {

Review Comment:
   I tested it without that change and it worked. So that change didn't fix the 
test. I removed that comment. But I still think it is good to have this test.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1779,37 +1821,40 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     "validate successful RocksDB load when metadata file is not overwritten") {
     val fmClass = "org.apache.spark.sql.execution.streaming.state." +
       "NoOverwriteFileSystemBasedCheckpointFileManager"
-    withTempDir { dir =>
-      val conf = dbConf.copy(minDeltasForSnapshot = 0) // create snapshot 
every commit
-      val hadoopConf = new Configuration()
-      hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fmClass)
-
-      val remoteDir = dir.getCanonicalPath
-      withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
-        db.load(0)
-        db.put("a", "1")
-        db.commit()
+    Seq(Some(fmClass), None).foreach { fm =>
+      withTempDir { dir =>
+        val conf = dbConf.copy(minDeltasForSnapshot = 0) // create snapshot 
every commit
+        val hadoopConf = new Configuration()
+        if (fm.isDefined) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to