Ngone51 opened a new pull request #28312:
URL: https://github.com/apache/spark/pull/28312
### What changes were proposed in this pull request?
Rewrite the periodically check logic of `abortableRpcFuture` to make sure
that barrier task would always return either desired messages or expected
exception.
This PR also simplify a bit around `AbortableRpcFuture`.
### Why are the changes needed?
Currently, the periodically check logic of `abortableRpcFuture` is done by
following:
```scala
...
var messages: Array[String] = null
while (!abortableRpcFuture.toFuture.isCompleted) {
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second)
...
}
return messages
```
It's possible that `abortableRpcFuture` complete before next invocation on
`messages = ...`. In this case, the task may return null messages or execute
successfully while it should throw exception(e.g. `SparkException` from
`BarrierCoordinator`).
And here's a flaky test which caused by this bug:
```
[info] BarrierTaskContextSuite:
[info] - share messages with allGather() call *** FAILED *** (18 seconds,
705 milliseconds)
[info] org.apache.spark.SparkException: Job aborted due to stage failure:
Could not recover from a failed barrier ResultStage. Most recent failure
reason: Stage failed because barrier task ResultTask(0, 2) finished
unsuccessfully.
[info] java.lang.NullPointerException
[info] at
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
[info] at
scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
[info] at
scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285)
[info] at
scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284)
[info] at
scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198)
[info] at
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68)
...
```
The test exception can be reproduced by changing the line `messages = ...`
to the following:
```scala
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros)
Thread.sleep(5000)
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually test and update some unit tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]