HeartSaVioR commented on code in PR #47850:
URL: https://github.com/apache/spark/pull/47850#discussion_r1730566094
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1736,6 +1737,97 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("load the same version of pending
snapshot uploading") {
+ // The test was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ db.load(0)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(1)
+ db.put("foo", "bar")
+ db.commit()
+ db.doMaintenance()
+
+ db.load(1)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.doMaintenance()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+ }
+ }
+
+ for (random_seed <- 1 to 16) {
+ testWithChangelogCheckpointingEnabled(s"randomized snapshotting
$random_seed") {
+ // The unit test simulates the case where batches can be reloaded and
maintenance tasks
+ // can be delayed. After each batch, we randomly decide whether we would
move onto the
+ // next batch, and whetehr maintenance task is executed.
+ // The test was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ val random = new Random(random_seed)
+ var curVer: Int = 0
+ for (i <- 1 to 100) {
+ db.load(curVer)
+ db.put("foo", "bar")
+ db.commit()
+ // For a one in five chance, maintenance task is executed. The chance
is created to
+ // simulate the case where snapshot isn't immediatelly uploaded, and
even delayed
+ // so that the next snapshot is ready. We create a snapshot in every
3 batches, so
+ // with 1/5 chance, it is more likely to create longer maintenance
delay.
+ if (random.nextInt(5) == 0) {
+ db.doMaintenance()
+ }
+ // For half the chance, we move to the next version, and half the
chance we keep the
+ // same version
+ if (random.nextInt(2) == 0) {
+ curVer = curVer + 1
+ }
+ }
+ }
+ }
+ }
+
+ testWithChangelogCheckpointingEnabled(s"simulate ForEachBatch") {
+ // In ForEachBatch, often batches are executed twice. We simulate this
case.
+ // The test was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ val random = new Random(seed = 66)
+ var curVer: Int = 0
+ for (i <- 1 to 100) {
+ db.load(curVer)
+ db.put("foo", "bar")
+ db.commit()
+ if (random.nextInt(5) == 0) {
Review Comment:
Does this test need to be randomized? If we know how it could fail for which
reason, I'd rather want to see it be direct. (If we could construct e2e test
which failed without the fix then would be ideal.) Randomized tests seem to be
covered in above test.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1736,6 +1737,84 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("load the same version of pending
snapshot uploading") {
Review Comment:
+1 Shall we describe the scenario and the expected behavior?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]