HeartSaVioR commented on code in PR #47850:
URL: https://github.com/apache/spark/pull/47850#discussion_r1735709388
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1736,6 +1737,127 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("reloading the same version") {
+ // Keep executing the same batch for two or more times. Some queries with
ForEachBatch
+ // will cause this behavior.
+ // The test was accidentally fixed by SPARK-48586
(https://github.com/apache/spark/pull/47130)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ // load the same version of pending snapshot uploading
+ // This is possible because after committing version x, we can continue
to x+1, and replay
+ // x+1. The replay will load a checkpoint by version x. At this moment,
the snapshot
+ // uploading may not be finished.
+ // Previously this generated a problem: new files generated by reloading
are added to
+ // local -> cloud file map and the information is used to skip some file
uploading, which is
+ // wrong because these files aren't a part of the RocksDB checkpoint.
+ // This bug was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ db.load(0)
Review Comment:
Sorry to ask about more effort, but shall we leave "walkthrough" code
comment for future readers? I think it'd be much easier to understand than
understanding the scenario described in above, and try to think through by
themselves. Let's ensure that the test is understandable for moderate people.
Please refer to the test suites for stateful operator which we track
watermark value (and state rows for complicated case) - we put code comment per
microbatch to walkthrough.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1736,6 +1737,127 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("reloading the same version") {
+ // Keep executing the same batch for two or more times. Some queries with
ForEachBatch
+ // will cause this behavior.
+ // The test was accidentally fixed by SPARK-48586
(https://github.com/apache/spark/pull/47130)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ // load the same version of pending snapshot uploading
+ // This is possible because after committing version x, we can continue
to x+1, and replay
+ // x+1. The replay will load a checkpoint by version x. At this moment,
the snapshot
+ // uploading may not be finished.
+ // Previously this generated a problem: new files generated by reloading
are added to
+ // local -> cloud file map and the information is used to skip some file
uploading, which is
+ // wrong because these files aren't a part of the RocksDB checkpoint.
+ // This bug was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ db.load(0)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(1)
+ db.put("foo", "bar")
+ db.commit()
+ db.doMaintenance()
+
+ db.load(1)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.doMaintenance()
+
+ db.load(2)
+ db.put("foo", "bar")
+ db.commit()
+
+ // Test the maintenance thread is delayed even after the next snapshot
is created.
+ // There will be two outstanding snapshots.
+ for (i <- 3 to 6) {
+ db.load(i)
+ db.put("foo", "bar")
+ db.commit()
+
+ db.load(i)
+ db.put("foo", "bar")
+ db.commit()
+ }
+ db.doMaintenance()
+
+ // Test the maintenance is called after each batch. This tests a common
case where
+ // maintenance tasks finish quickly.
+ for (i <- 7 to 10) {
+ for (j <- 0 to 1) {
+ db.load(i)
+ db.put("foo", "bar")
+ db.commit()
+ db.doMaintenance()
+ }
+ }
+ }
+ }
+
+ for (random_seed <- 1 to 8) {
Review Comment:
nit: use camelCase for consistency
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1736,6 +1737,127 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("reloading the same version") {
+ // Keep executing the same batch for two or more times. Some queries with
ForEachBatch
+ // will cause this behavior.
+ // The test was accidentally fixed by SPARK-48586
(https://github.com/apache/spark/pull/47130)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ // load the same version of pending snapshot uploading
+ // This is possible because after committing version x, we can continue
to x+1, and replay
+ // x+1. The replay will load a checkpoint by version x. At this moment,
the snapshot
+ // uploading may not be finished.
+ // Previously this generated a problem: new files generated by reloading
are added to
+ // local -> cloud file map and the information is used to skip some file
uploading, which is
+ // wrong because these files aren't a part of the RocksDB checkpoint.
+ // This bug was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
Review Comment:
nit: The ticket info is not updated here
```
// The test was accidentally fixed by SPARK-48586
(https://github.com/apache/spark/pull/47130)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]