[ 
https://issues.apache.org/jira/browse/SPARK-17610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang closed SPARK-17610.
----------------------------
    Resolution: Not A Problem

> The failed stage caused by FetchFailed may never be resubmitted
> ---------------------------------------------------------------
>
>                 Key: SPARK-17610
>                 URL: https://issues.apache.org/jira/browse/SPARK-17610
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 2.0.0
>            Reporter: Tao Wang
>            Priority: Critical
>
> We have a problem in our environment, in which the failed stage has not been 
> resubmitted ever. Because it is caused by FetchFailed exception, I took a 
> look at the corresponsive code segment and found some issues:
> In DAGScheduler.handleTaskCompletion, it first check if the `failedStages` is 
> empty, and do two steps when the answer is true:
> 1. send `ResubmitFailedStages` to evnetProcessLoop 
> 2. add failed stages into `failedStages`
> in `eventProcessLoop`, it first take all elements in `failedStages` to 
> resubmit them, then clear the set.
> If the event happens like below, there might be some problem:
> assume t1 < t2 < t3
> at t1, failed stage 1 was handled, the ResubmitFailedStages was send to 
> eventProcessLoop
> at t2, eventProcessLoop handle the ResubmitFailedStages and clear the empty 
> `failedStages`
> at t3, failed stage 1 was added into `failedStages`
> now failed stage 1 has not been resubmitted for now.
> after anytime at t3, the `failedStages` will never be empty even if we have 
> new failed stages caused by FetchFailed coming in, because the `failedStages` 
> containing failed stage 1 is not empty.
> The codes is below: 
> {code}
> } else if (failedStages.isEmpty) {
>             // Don't schedule an event to resubmit failed stages if failed 
> isn't empty, because
>             // in that case the event will already have been scheduled.
>             // TODO: Cancel running tasks in the stage
>             logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
>               s"$failedStage (${failedStage.name}) due to fetch failure")
>             messageScheduler.schedule(new Runnable {
>               override def run(): Unit = 
> eventProcessLoop.post(ResubmitFailedStages)
>             }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
>           }
>           failedStages += failedStage
>           failedStages += mapStage
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to