sarutak opened a new pull request, #53814: URL: https://github.com/apache/spark/pull/53814
### What changes were proposed in this pull request? This PR fixes a flaky test `SPARK-33587: isFatalError` in `ExecutorSuite`. Recnetly, this test occasionally fails. https://github.com/apache/spark/actions/runs/20850209017/job/59903273573#step:10:4885 ``` [info] - SPARK-33587: isFatalError *** FAILED *** (30 milliseconds) [info] false did not equal true (ExecutorSuite.scala:516) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.executor.ExecutorSuite.testThrowable$1(ExecutorSuite.scala:516) [info] at org.apache.spark.executor.ExecutorSuite.$anonfun$new$38(ExecutorSuite.scala:528) [info] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:256) [info] at org.apache.spark.executor.ExecutorSuite.$anonfun$new$33(ExecutorSuite.scala:527) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) [info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) [info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) [info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68) [info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:323) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) [info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) [info] at org.scalatest.Suite.run(Suite.scala:1114) [info] at org.scalatest.Suite.run$(Suite.scala:1096) [info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [info] at java.base/java.lang.Thread.run(Thread.java:840) ``` This issue happens like as follows. 1. [testThrowable](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L529) is invoked with `InterruptedException` 2. `InterruptedException` is passed to [errorInGuavaCache](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L511) 3. [Cache#get](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L498) is invoked. The actual class is [LocalCache.LocalLoadingCache](https://github.com/google/guava/blob/f06690fa3e874f65515e8fd338a74d636e2c792f/guava/src/com/google/common/cache/CacheBuilder.java#L1032) 4. The flow reaches [this line](https://github.com/google/guava/blob/f06690fa3e874f65515e8fd338a74d636e2c792f/guava/src/com/google/common/cache/LocalCache.java#L3570) and the interrupted status is set for the current Thread 5. Escaped from `errorInGuavaCache`, and at the next round of the loop, [errorInThreadPool](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L517) is invoked 6. Through [f.get()](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L485), [FutureTask#awaitDone](https://github.com/openjdk/jdk/blob/dfacda488bfbe2e11e8d607a6d08527710286982/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L393) is invoked. 7. If the future task is immediately done, the flow reaches [this line](https://github.com/openjdk/jdk/blob/dfacda488bfbe2e11e8d607a6d08527710286982/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L410), that issue doesn't occur otherwise the flow reaches [this line](https://github.com/openjdk/jdk/blob/dfacda488bfbe2e11e8d607a6d08527710286982/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L418) because the interrupted status is set. Then, `InterruptedException` is thrown without the causes. 8. `InterruptedException` is not a fatal error so if `testThrowable` is invoked with `isFatal = true` this issue occurs. This issue can be reproducible by changing `ExecutorSuite.scala` like as follows. ``` diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index dd9884bffb2..42a656cc88b 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -482,6 +482,8 @@ class ExecutorSuite extends SparkFunSuite val f = taskPool.submit(new java.util.concurrent.Callable[String] { override def call(): String = throw e }) + // Time consuming task which doesn't clear the interrupted status. + Array.fill(10000)(Math.random()) f.get() } finally { taskPool.shutdown() @@ -514,7 +516,12 @@ class ExecutorSuite extends SparkFunSuite depthToCheck) == (depthToCheck >= 2 && isFatal)) // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors. assert(isFatalError( - errorInThreadPool(errorInGuavaCache(e)), + errorInThreadPool({ + if (isFatal) { + Thread.sleep(100) + } + errorInGuavaCache(e) + }), ``` To fix the issue, this PR clear the interrupted status after `cache.get` in `errorInGuavaCache`. ### Why are the changes needed? For test stability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Confirmed the test passed even if `ExecutorSuite.scala` is changed like mentioned above. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
