GitHub user umehrot2 opened a pull request:

    https://github.com/apache/spark/pull/17445

    [SPARK-20115] [CORE] Fix DAGScheduler to recompute all the lost shuffle 
blocks when external shuffle service is unavailable

    ## What changes were proposed in this pull request?
    The Spark’s DAGScheduler currently does not recompute all the lost 
shuffle blocks on a host when a FetchFailed exception occurs, while fetching 
shuffle blocks from another executor with external shuffle service enabled. 
Instead it only recomputes the lost shuffle blocks computed by the executor for 
which the FetchFailed exception occurred. This works fine for Internal shuffle 
scenario, where the executors serve their own shuffle blocks and hence only the 
shuffle blocks for that executor should be considered lost. However, when 
External Shuffle Service is being used, a FetchFailed exception would mean that 
the external shuffle service running on that host has become unavailable. This 
in turn is sufficient to assume that all the shuffle blocks which were managed 
by the Shuffle service on that host are lost. Therefore, just recomputing the 
shuffle blocks associated with the particular Executor for which FetchFailed 
exception occurred is not sufficient. We need to recompute all the
  shuffle blocks, managed by that service because there could be multiple 
executors running on that host.
    
    Since not all the shuffle blocks (for all the executors on the host) are 
recomputed, this causes future attempts of the reduce stage to fail as well 
because the new tasks scheduled still keep trying to reach the old location of 
the shuffle blocks (which were not recomputed) and keep throwing further 
FetchFailed exceptions. This ultimately causes the job to fail, after the 
reduce stage has been retried 4 times.
    
    Following changes are proposed to address the above issue:
    1. In case of FetchFailed exception when using external shuffle service, 
mark all the shuffle outputs on the host as failed (due to failure of external 
shuffle service).
    2. Thus recompute all the lost shuffle blocks, instead of for just one 
executor.
    
    ## How was this patch tested?
    1. Added unit test for the change in functionality.
    2. Tested on a cluster with Spark running on Yarn (with external shuffle 
enabled), by performing the following steps:
    - Start a word count job, and wait for the Map stage to be completed
    - During the reduce stage, stop the external shuffle service on a host
    - Wait for fetch failed exception to occur, while fetching shuffle blocks 
from the host
    - Check that in the reattempt of the Map stage, Spark computes all the lost 
shuffle blocks for the host on which shuffle service was stopped
    - Job completes successfully, since reduce stage in next reattempt finds 
all the shuffle blocks


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/umehrot2/spark spark-dagscheduler-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17445.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17445
    
----
commit 6642de3bd7215cc311c4d51284ffe1c50387cdfc
Author: Udit Mehrotra <[email protected]>
Date:   2017-02-14T23:27:15Z

    Allow Spark to recompute all the shuffle blocks on a host, if external 
shuffle service is unavailable on that host
    
    cr https://cr.amazon.com/r/6822886/

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to