GitHub user juanrh opened a pull request:
https://github.com/apache/spark/pull/19583
[WIP][SPARK-22339] [CORE] [NETWORK-SHUFFLE]
## What changes were proposed in this pull request?
When a task finishes with error due to a fetch error, then DAGScheduler
unregisters the shuffle blocks hosted by the serving executor (or even all the
executors in the failing host, with external shuffle and
spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block
directory stored by MapOutputTracker, that then increments its epoch as a
result. This event is only signaled to the other executors when a new task with
a new epoch starts in each executor. This means that other executors reading
from the failed executors will retry fetching shuffle blocks from them, even
though the driver already knows those executors are lost and those blocks are
now unavailable at those locations. This impacts job runtime, specially for
long shuffles and executor failures at the end of a stage, when the only
pending tasks are shuffle reads.
This could be improved by pushing the epoch update to the executors without
having to wait for a new task. In the attached patch I sketch a possible
solution that sends the updated epoch from the driver to the executors by
piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator,
RetryingBlockFetcher and BlockFetchingListener are modified so blocks locations
are checked on each fetch retry. This doesn't introduce additional traffic, as
MapOutputTrackerWorker.mapStatuses is shared by all tasks running on the same
Executor, and the lookup of the new shuffle blocks directory was going to
happen anyway when the new epoch is detected during the start of the next task.
I would like to know the opinion of the community on this approach.
## How was this patch tested?
Unit tests, tests in Yarn cluster
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/juanrh/spark hortala-push_epoch_update
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/19583.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #19583
----
commit 1381edd9ba5d6b776c91fec14f25382225e80cb4
Author: Juan Rodriguez Hortala <[email protected]>
Date: 2017-10-21T01:56:29Z
draft proposal
commit 7f538df6f3bffcb76a7dbc262cf75d410b635bdd
Author: Juan Rodriguez Hortala <[email protected]>
Date: 2017-10-21T02:06:43Z
adapt code to change in MapOutputTracker
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]