Ngone51 opened a new pull request #28312:
URL: https://github.com/apache/spark/pull/28312


   ### What changes were proposed in this pull request?
   
   Rewrite the periodically check logic of  `abortableRpcFuture` to make sure 
that barrier task would always return either desired messages or expected 
exception.
   
   This PR also simplify a bit around `AbortableRpcFuture`.
   
   ### Why are the changes needed?
   
   Currently, the periodically check logic of  `abortableRpcFuture` is done by 
following:
   
   ```scala
   ...
   var messages: Array[String] = null
   
   while (!abortableRpcFuture.toFuture.isCompleted) {
      messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second)
      ...
   }
   return messages
   ```
   It's possible that `abortableRpcFuture` complete before next invocation on 
`messages = ...`. In this case, the task may return null messages or execute 
successfully while it should throw exception(e.g. `SparkException` from 
`BarrierCoordinator`).
   
   And here's a flaky test which caused by this bug:
   
   ```
   [info] BarrierTaskContextSuite:
   [info] - share messages with allGather() call *** FAILED *** (18 seconds, 
705 milliseconds)
   [info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
Could not recover from a failed barrier ResultStage. Most recent failure 
reason: Stage failed because barrier task ResultTask(0, 2) finished 
unsuccessfully.
   [info] java.lang.NullPointerException
   [info]       at 
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
   [info]       at 
scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
   [info]       at 
scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285)
   [info]       at 
scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284)
   [info]       at 
scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198)
   [info]       at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68)
   ...
   ```
   
   The test exception can be reproduced by changing the line `messages = ...` 
to the following:
   
   ```scala
   messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros)
   Thread.sleep(5000)
   ```
   
   ### Does this PR introduce any user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Manually test and update some unit tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to