sarutak opened a new pull request, #53814:
URL: https://github.com/apache/spark/pull/53814

   ### What changes were proposed in this pull request?
   This PR fixes a flaky test `SPARK-33587: isFatalError` in `ExecutorSuite`.
   Recnetly, this test occasionally fails.
   
https://github.com/apache/spark/actions/runs/20850209017/job/59903273573#step:10:4885
   
   ```
   [info] - SPARK-33587: isFatalError *** FAILED *** (30 milliseconds)
   [info]   false did not equal true (ExecutorSuite.scala:516)
   [info]   org.scalatest.exceptions.TestFailedException:
   [info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   [info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   [info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
   [info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
   [info]   at 
org.apache.spark.executor.ExecutorSuite.testThrowable$1(ExecutorSuite.scala:516)
   [info]   at 
org.apache.spark.executor.ExecutorSuite.$anonfun$new$38(ExecutorSuite.scala:528)
   [info]   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:256)
   [info]   at 
org.apache.spark.executor.ExecutorSuite.$anonfun$new$33(ExecutorSuite.scala:527)
   [info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   [info]   at 
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
   [info]   at 
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
   [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
   [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
   [info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68)
   [info]   at 
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at 
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
   [info]   at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
   [info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:323)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
   [info]   at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
   [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
   [info]   at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   This issue happens like as follows.
   
   1. 
[testThrowable](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L529)
 is invoked with `InterruptedException`
   2. `InterruptedException` is passed to 
[errorInGuavaCache](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L511)
   3. 
[Cache#get](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L498)
 is invoked. The actual class is 
[LocalCache.LocalLoadingCache](https://github.com/google/guava/blob/f06690fa3e874f65515e8fd338a74d636e2c792f/guava/src/com/google/common/cache/CacheBuilder.java#L1032)
   4. The flow reaches [this 
line](https://github.com/google/guava/blob/f06690fa3e874f65515e8fd338a74d636e2c792f/guava/src/com/google/common/cache/LocalCache.java#L3570)
 and the interrupted status is set for the current Thread
   5. Escaped from `errorInGuavaCache`, and at the next round of the loop, 
[errorInThreadPool](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L517)
 is invoked
   6. Through 
[f.get()](https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala#L485),
 
[FutureTask#awaitDone](https://github.com/openjdk/jdk/blob/dfacda488bfbe2e11e8d607a6d08527710286982/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L393)
 is invoked.
   7. If the future task is immediately done, the flow reaches [this 
line](https://github.com/openjdk/jdk/blob/dfacda488bfbe2e11e8d607a6d08527710286982/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L410),
 that issue doesn't occur otherwise the flow reaches [this 
line](https://github.com/openjdk/jdk/blob/dfacda488bfbe2e11e8d607a6d08527710286982/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L418)
 because the interrupted status is set. Then, `InterruptedException` is thrown 
without the causes.
   8. `InterruptedException` is not a fatal error so if `testThrowable` is 
invoked with `isFatal = true` this issue occurs.
   
   This issue can be reproducible by changing `ExecutorSuite.scala` like as 
follows.
   ```
   diff --git 
a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
   index dd9884bffb2..42a656cc88b 100644
   --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
   +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
   @@ -482,6 +482,8 @@ class ExecutorSuite extends SparkFunSuite
              val f = taskPool.submit(new java.util.concurrent.Callable[String] 
{
                override def call(): String = throw e
              })
   +          // Time consuming task which doesn't clear the interrupted status.
   +          Array.fill(10000)(Math.random())
              f.get()
            } finally {
              taskPool.shutdown()
   @@ -514,7 +516,12 @@ class ExecutorSuite extends SparkFunSuite
            depthToCheck) == (depthToCheck >= 2 && isFatal))
          // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to 
detect fatal errors.
          assert(isFatalError(
   -        errorInThreadPool(errorInGuavaCache(e)),
   +        errorInThreadPool({
   +          if (isFatal) {
   +            Thread.sleep(100)
   +          }
   +          errorInGuavaCache(e)
   +        }),
   ```
   
   To fix the issue, this PR clear the interrupted status after `cache.get` in 
`errorInGuavaCache`.
   
   ### Why are the changes needed?
   For test stability.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Confirmed the test passed even if `ExecutorSuite.scala` is changed like 
mentioned above.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to