HeartSaVioR commented on code in PR #47875:
URL: https://github.com/apache/spark/pull/47875#discussion_r1804034742


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2201,6 +2246,97 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("load the same version of pending 
snapshot uploading") {
+    // The test was accidentally fixed by SPARK-48931 
(https://github.com/apache/spark/pull/47393)
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      db.load(0)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.load(1)
+      db.put("foo", "bar")
+      db.commit()
+      db.doMaintenance()
+
+      db.load(1)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.load(2)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.load(2)
+      db.put("foo", "bar")
+      db.commit()
+
+      db.doMaintenance()
+
+      db.load(2)
+      db.put("foo", "bar")
+      db.commit()
+    }
+  }
+
+  for (random_seed <- 1 to 16) {
+    testWithChangelogCheckpointingEnabled(s"randomized snapshotting 
$random_seed") {
+      // The unit test simulates the case where batches can be reloaded and 
maintenance tasks
+      // can be delayed. After each batch, we randomly decide whether we would 
move onto the
+      // next batch, and whetehr maintenance task is executed.
+      // The test was accidentally fixed by SPARK-48931 
(https://github.com/apache/spark/pull/47393)
+      val remoteDir = Utils.createTempDir().toString
+      val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false)
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
+      withDB(remoteDir, conf = conf) { db =>
+        val random = new Random(random_seed)
+        var curVer: Int = 0
+        for (i <- 1 to 100) {
+          db.load(curVer)
+          db.put("foo", "bar")
+          db.commit()
+          // For a one in five chance, maintenance task is executed. The 
chance is created to
+          // simulate the case where snapshot isn't immediately uploaded, and 
even delayed
+          // so that the next snapshot is ready. We create a snapshot in every 
3 batches, so
+          // with 1/5 chance, it is more likely to create longer maintenance 
delay.
+          if (random.nextInt(5) == 0) {
+            db.doMaintenance()
+          }
+          // For half the chance, we move to the next version, and half the 
chance we keep the
+          // same version
+          if (random.nextInt(2) == 0) {
+            curVer = curVer + 1
+          }
+        }
+      }
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(s"simulate ForEachBatch") {

Review Comment:
   I meant, from my understanding, this is mostly the same with @siying added 
with random seed, except here we pick the specific seed. If we have to do this 
separately, we should clarify why seed = 66 should be picked up consistently. 
Did we see any test failure from seed = 66?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to