GitHub user juanrh opened a pull request:

    https://github.com/apache/spark/pull/19583

    [WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE]

    ## What changes were proposed in this pull request?
    When a task finishes with error due to a fetch error, then DAGScheduler 
unregisters the shuffle blocks hosted by the serving executor (or even all the 
executors in the failing host, with external shuffle and 
spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block 
directory stored by MapOutputTracker, that then increments its epoch as a 
result. This event is only signaled to the other executors when a new task with 
a new epoch starts in each executor. This means that other executors reading 
from the failed executors will retry fetching shuffle blocks from them, even 
though the driver already knows those executors are lost and those blocks are 
now unavailable at those locations. This impacts job runtime, specially for 
long shuffles and executor failures at the end of a stage, when the only 
pending tasks are shuffle reads.
    This could be improved by pushing the epoch update to the executors without 
having to wait for a new task. In the attached patch I sketch a possible 
solution that sends the updated epoch from the driver to the executors by 
piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, 
RetryingBlockFetcher and BlockFetchingListener are modified so blocks locations 
are checked on each fetch retry. This doesn't introduce additional traffic, as 
MapOutputTrackerWorker.mapStatuses is shared by all tasks running on the same 
Executor, and the lookup of the new shuffle blocks directory was going to 
happen anyway when the new epoch is detected during the start of the next task.
    I would like to know the opinion of the community on this approach.
    
    ## How was this patch tested?
    Unit tests, tests in Yarn cluster

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/juanrh/spark hortala-push_epoch_update

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19583.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19583
    
----
commit 1381edd9ba5d6b776c91fec14f25382225e80cb4
Author: Juan Rodriguez Hortala <[email protected]>
Date:   2017-10-21T01:56:29Z

    draft proposal

commit 7f538df6f3bffcb76a7dbc262cf75d410b635bdd
Author: Juan Rodriguez Hortala <[email protected]>
Date:   2017-10-21T02:06:43Z

    adapt code to change in MapOutputTracker

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to