siying commented on code in PR #47850:
URL: https://github.com/apache/spark/pull/47850#discussion_r1735134600
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1736,6 +1737,127 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("reloading the same version") {
+ // Keep executing the same batch for two or more times. Some queries with
ForEachBatch
+ // will cause this behavior.
+ // The test was accidentally fixed by SPARK-48931
(https://github.com/apache/spark/pull/47393)
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf) { db =>
+ // load the same version of pending snapshot uploading
Review Comment:
Sorry the ticket I gave was the wrong one. I updated it and it should now
work.
The commit cannot be easily reverted now, but I applied the task to the
parent hash of the commit (b5a55e46e9c126f73287ff3a8290828a9cd484a0) and
confirmed that it would fail:
```
[info] - reloading the same version (with changelog checkpointing) ***
FAILED *** (292 milliseconds)
[info] org.rocksdb.RocksDBException: Mismatch in unique ID on table file
8. Expected: {12521394303566436904,8218421418606057953} Actual:
{12521394303566436907,7844327260652356763} The file
/home/siying.dong/spark2/target/tmp/spark-3438851b-7e84-4630-a96d-22b595c58b1b/workingDir-a30c2e57-73c7-49fa-b96b-5b6d8b863383/MANIFEST-000005
may be corrupted.
[info] at org.rocksdb.RocksDB.open(Native Method)
[info] at org.rocksdb.RocksDB.open(RocksDB.java:325)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDB.openDB(RocksDB.scala:901)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:194)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$267(RocksDBSuite.scala:2052)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$267$adapted(RocksDBSuite.scala:2020)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.withDB(RocksDBSuite.scala:2411)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$266(RocksDBSuite.scala:2020)
[info] at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at
org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info] at
org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(RocksDBSuite.scala:165)
[info] at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:248)
[info] at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:246)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.withSQLConf(RocksDBSuite.scala:165)
[info] at
org.apache.spark.sql.execution.streaming.state.AlsoTestWithChangelogCheckpointingEnabled.$anonfun$testWithChangelogCheckpointingEnabled$1(RocksDBSuite.scala:107)
[info] at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info] at
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info] at
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info] at
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info] at
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info] at
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info] at
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:334)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info] at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
```
and after applying the commit 40ad8290e997c91a081cf9537a04ee4499048ce2, the
test would pass.
I can explain why it fails in person. But it is nothing to do with data race
(I hope I didn't say it in the commnts). It's related to the sequence of:
1. create snapshot for version n
2. reload another version n from cloud
3. upload snapshot n (overwriting the existing one)
and this uploading is problematic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]