cloud-fan commented on a change in pull request #23638: [SPARK-26713][CORE] 
Interrupt pipe IO threads in PipedRDD when task is finished
URL: https://github.com/apache/spark/pull/23638#discussion_r251869084
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##########
 @@ -141,7 +141,14 @@ final class ShuffleBlockFetcherIterator(
 
   /**
    * Whether the iterator is still active. If isZombie is true, the callback 
interface will no
-   * longer place fetched blocks into [[results]].
+   * longer place fetched blocks into [[results]] and the iterator is marked 
as fully consumed.
+   *
+   * When the iterator is inactive, [[hasNext]] and [[next]] calls will honor 
that as there are
+   * cases the iterator is still being consumed. For example, ShuffledRDD + 
PipedRDD if the
+   * subprocess command is failed. The task will be marked as failed, then the 
iterator will be
+   * cleaned up at task completion, the [[next]] call (called in the stdin 
writer thread of
+   * PipedRDD if not exited yet) may hang at [[results.take]]. The defensive 
check in [[hasNext]]
+   * and [[next]] reduces the possibility of such race conditions.
 
 Review comment:
   Ok I got it. The thing we need to guarantee is, the iterator should not hang 
in any cases, even if the task has finished and cleanup is called.
   
   `ShuffleBlockFetcherIterator` does have a bug, that there can be 2 threads 
consuming the `results` queue at the same time, and cause `next` to hang.
   
   I don't think the fix here is corrected. Right after `while (!isZombie && 
result == null)`, the other thread may set `isZombie` to `true` and consume up 
the `result` queue, and we may still hang here. It does reduce the possibility 
of this race condition, but the problem is still there.
   
   Can we revert it and come up with a better fix? I think we should do the 
heavy work in `hasNext` and put the result in a temporary variable, then the 
`next` just returns that temporary variable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to