spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
Repository: spark Updated Branches: refs/heads/branch-2.0 4d73d5cd8 -> 4c694e452 [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure | Time|Thread 1 , Job1 | Thread 2 , Job2 | |:-:|:-:|:-:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage || | 3 | | task failed due to FetchFailed | | 4 | | can not post ResubmitFailedStages because failedStages is not empty | Then job2 of thread2 never resubmit the failed stage and hang. We should not add the failedStages when abortStage for fetch failure added unit test Author: w00228970Author: wangfei Closes #15213 from scwf/dag-resubmit. (cherry picked from commit 46d1203bf2d01b219c4efc7e0e77a844c0c664da) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c694e45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c694e45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c694e45 Branch: refs/heads/branch-2.0 Commit: 4c694e452278e46231720e778a80c586b9e565f1 Parents: 4d73d5c Author: w00228970 Authored: Wed Sep 28 12:02:59 2016 -0700 Committer: Shixiong Zhu Committed: Wed Sep 28 12:08:56 2016 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 24 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++- 2 files changed, 70 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 399d671..e7e2ff1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1277,18 +1277,20 @@ class DAGScheduler( s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) - } else if (failedStages.isEmpty) { -// Don't schedule an event to resubmit failed stages if failed isn't empty, because -// in that case the event will already have been scheduled. -// TODO: Cancel running tasks in the stage -logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") -messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) -}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } else { +if (failedStages.isEmpty) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. + // TODO: Cancel running tasks in the stage + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure") + messageScheduler.schedule(new Runnable { +override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) +} +failedStages += failedStage +failedStages += mapStage } - failedStages += failedStage - failedStages += mapStage // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f69e8f2..5c35302 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} @@ -31,7 +32,7 @@ import org.apache.spark._ import
spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
Repository: spark Updated Branches: refs/heads/master 219003775 -> 46d1203bf [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure ## What changes were proposed in this pull request? | Time|Thread 1 , Job1 | Thread 2 , Job2 | |:-:|:-:|:-:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage || | 3 | | task failed due to FetchFailed | | 4 | | can not post ResubmitFailedStages because failedStages is not empty | Then job2 of thread2 never resubmit the failed stage and hang. We should not add the failedStages when abortStage for fetch failure ## How was this patch tested? added unit test Author: w00228970Author: wangfei Closes #15213 from scwf/dag-resubmit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46d1203b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46d1203b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46d1203b Branch: refs/heads/master Commit: 46d1203bf2d01b219c4efc7e0e77a844c0c664da Parents: 2190037 Author: w00228970 Authored: Wed Sep 28 12:02:59 2016 -0700 Committer: Shixiong Zhu Committed: Wed Sep 28 12:02:59 2016 -0700 -- .../apache/spark/scheduler/DAGScheduler.scala | 24 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++- 2 files changed, 70 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5ea0b48..f251740 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1263,18 +1263,20 @@ class DAGScheduler( s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) - } else if (failedStages.isEmpty) { -// Don't schedule an event to resubmit failed stages if failed isn't empty, because -// in that case the event will already have been scheduled. -// TODO: Cancel running tasks in the stage -logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") -messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) -}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } else { +if (failedStages.isEmpty) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. + // TODO: Cancel running tasks in the stage + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure") + messageScheduler.schedule(new Runnable { +override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) +} +failedStages += failedStage +failedStages += mapStage } - failedStages += failedStage - failedStages += mapStage // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6787b30..bec95d1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} @@ -31,7 +32,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import