spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

2016-09-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4d73d5cd8 -> 4c694e452


[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

| Time|Thread 1 ,  Job1  | Thread 2 ,  Job2  |
|:-:|:-:|:-:|
| 1 | abort stage due to FetchFailed |  |
| 2 | failedStages += failedStage ||
| 3 |  |  task failed due to  FetchFailed |
| 4 |  |  can not post ResubmitFailedStages because failedStages is not 
empty |

Then job2 of thread2 never resubmit the failed stage and hang.

We should not add the failedStages when abortStage for fetch failure

added unit test

Author: w00228970 
Author: wangfei 

Closes #15213 from scwf/dag-resubmit.

(cherry picked from commit 46d1203bf2d01b219c4efc7e0e77a844c0c664da)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c694e45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c694e45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c694e45

Branch: refs/heads/branch-2.0
Commit: 4c694e452278e46231720e778a80c586b9e565f1
Parents: 4d73d5c
Author: w00228970 
Authored: Wed Sep 28 12:02:59 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Sep 28 12:08:56 2016 -0700

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 24 
 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++-
 2 files changed, 70 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 399d671..e7e2ff1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1277,18 +1277,20 @@ class DAGScheduler(
   s"has failed the maximum allowable number of " +
   s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
   s"Most recent failure reason: ${failureMessage}", None)
-  } else if (failedStages.isEmpty) {
-// Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
-// in that case the event will already have been scheduled.
-// TODO: Cancel running tasks in the stage
-logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-  s"$failedStage (${failedStage.name}) due to fetch failure")
-messageScheduler.schedule(new Runnable {
-  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  } else {
+if (failedStages.isEmpty) {
+  // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
+  // in that case the event will already have been scheduled.
+  // TODO: Cancel running tasks in the stage
+  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure")
+  messageScheduler.schedule(new Runnable {
+override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+}
+failedStages += failedStage
+failedStages += mapStage
   }
-  failedStages += failedStage
-  failedStages += mapStage
   // Mark the map whose fetch failed as broken in the map stage
   if (mapId != -1) {
 mapStage.removeOutputLoc(mapId, bmAddress)

http://git-wip-us.apache.org/repos/asf/spark/blob/4c694e45/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f69e8f2..5c35302 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -31,7 +32,7 @@ import org.apache.spark._
 import 

spark git commit: [SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

2016-09-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 219003775 -> 46d1203bf


[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

## What changes were proposed in this pull request?
| Time|Thread 1 ,  Job1  | Thread 2 ,  Job2  |
|:-:|:-:|:-:|
| 1 | abort stage due to FetchFailed |  |
| 2 | failedStages += failedStage ||
| 3 |  |  task failed due to  FetchFailed |
| 4 |  |  can not post ResubmitFailedStages because failedStages is not 
empty |

Then job2 of thread2 never resubmit the failed stage and hang.

We should not add the failedStages when abortStage for fetch failure

## How was this patch tested?

added unit test

Author: w00228970 
Author: wangfei 

Closes #15213 from scwf/dag-resubmit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46d1203b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46d1203b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46d1203b

Branch: refs/heads/master
Commit: 46d1203bf2d01b219c4efc7e0e77a844c0c664da
Parents: 2190037
Author: w00228970 
Authored: Wed Sep 28 12:02:59 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Sep 28 12:02:59 2016 -0700

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 24 
 .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++-
 2 files changed, 70 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5ea0b48..f251740 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1263,18 +1263,20 @@ class DAGScheduler(
   s"has failed the maximum allowable number of " +
   s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
   s"Most recent failure reason: ${failureMessage}", None)
-  } else if (failedStages.isEmpty) {
-// Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
-// in that case the event will already have been scheduled.
-// TODO: Cancel running tasks in the stage
-logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-  s"$failedStage (${failedStage.name}) due to fetch failure")
-messageScheduler.schedule(new Runnable {
-  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  } else {
+if (failedStages.isEmpty) {
+  // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
+  // in that case the event will already have been scheduled.
+  // TODO: Cancel running tasks in the stage
+  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure")
+  messageScheduler.schedule(new Runnable {
+override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+}
+failedStages += failedStage
+failedStages += mapStage
   }
-  failedStages += failedStage
-  failedStages += mapStage
   // Mark the map whose fetch failed as broken in the map stage
   if (mapId != -1) {
 mapStage.removeOutputLoc(mapId, bmAddress)

http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6787b30..bec95d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -31,7 +32,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import