HeartSaVioR commented on code in PR #47875:
URL: https://github.com/apache/spark/pull/47875#discussion_r1804034742
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2201,6 +2246,97 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("load the same version of pending
snapshot uploading") {
+ // The test was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ db.load(0)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(1)
+ db.put("foo", "bar")
+ db.commit()
+ db.doMaintenance()
+
+ db.load(1)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.doMaintenance()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+ }
+ }
+
+ for (random_seed <- 1 to 16) {
+ testWithChangelogCheckpointingEnabled(s"randomized snapshotting
$random_seed") {
+ // The unit test simulates the case where batches can be reloaded and
maintenance tasks
+ // can be delayed. After each batch, we randomly decide whether we would
move onto the
+ // next batch, and whetehr maintenance task is executed.
+ // The test was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ val random = new Random(random_seed)
+ var curVer: Int = 0
+ for (i <- 1 to 100) {
+ db.load(curVer)
+ db.put("foo", "bar")
+ db.commit()
+ // For a one in five chance, maintenance task is executed. The
chance is created to
+ // simulate the case where snapshot isn't immediately uploaded, and
even delayed
+ // so that the next snapshot is ready. We create a snapshot in every
3 batches, so
+ // with 1/5 chance, it is more likely to create longer maintenance
delay.
+ if (random.nextInt(5) == 0) {
+ db.doMaintenance()
+ }
+ // For half the chance, we move to the next version, and half the
chance we keep the
+ // same version
+ if (random.nextInt(2) == 0) {
+ curVer = curVer + 1
+ }
+ }
+ }
+ }
+ }
+
+ testWithChangelogCheckpointingEnabled(s"simulate ForEachBatch") {
Review Comment:
I meant, from my understanding, this is mostly the same with @siying added
with random seed, except here we pick the specific seed. If we have to do this
separately, we should clarify why seed = 66 should be picked up consistently.
Did we see any test failure from seed = 66?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]