Repository: spark
Updated Branches:
  refs/heads/master b8788b3e7 -> 5059255d9


[SPARK-25161][CORE] Fix several bugs in failure handling of barrier execution 
mode

## What changes were proposed in this pull request?

Fix several bugs in failure handling of barrier execution mode:
* Mark TaskSet for a barrier stage as zombie when a task attempt fails;
* Multiple barrier task failures from a single barrier stage should not trigger 
multiple stage retries;
* Barrier task failure from a previous failed stage attempt should not trigger 
stage retry;
* Fail the job when a task from a barrier ResultStage failed;
* RDD.isBarrier() should not rely on `ShuffleDependency`s.

## How was this patch tested?

Added corresponding test cases in `DAGSchedulerSuite` and 
`TaskSchedulerImplSuite`.

Closes #22158 from jiangxb1987/failure.

Authored-by: Xingbo Jiang <xingbo.ji...@databricks.com>
Signed-off-by: Xiangrui Meng <m...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5059255d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5059255d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5059255d

Branch: refs/heads/master
Commit: 5059255d91fc7a9810e013eba39e12d30291dd08
Parents: b8788b3
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Tue Aug 21 08:25:02 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Tue Aug 21 08:25:02 2018 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   3 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 125 +++++++++++--------
 .../apache/spark/scheduler/TaskSetManager.scala |   4 +
 .../spark/scheduler/DAGSchedulerSuite.scala     | 106 ++++++++++++++++
 .../scheduler/TaskSchedulerImplSuite.scala      |  18 +++
 5 files changed, 200 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index cbc1143..374b846 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1863,7 +1863,8 @@ abstract class RDD[T: ClassTag](
 
   // From performance concern, cache the value to avoid repeatedly compute 
`isBarrier()` on a long
   // RDD chain.
-  @transient protected lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
+  @transient protected lazy val isBarrier_ : Boolean =
+    dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 2b0ca13..6787250 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1478,9 +1478,11 @@ private[spark] class DAGScheduler(
                 
mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId)
 
               case failedResultStage: ResultStage =>
-                // Mark all the partitions of the result stage to be not 
finished, to ensure retry
-                // all the tasks on resubmitted stage attempt.
-                failedResultStage.activeJob.map(_.resetAllPartitions())
+                // Abort the failed result stage since we may have committed 
output for some
+                // partitions.
+                val reason = "Could not recover from a failed barrier 
ResultStage. Most recent " +
+                  s"failure reason: $failureMessage"
+                abortStage(failedResultStage, reason, None)
             }
           }
 
@@ -1553,62 +1555,75 @@ private[spark] class DAGScheduler(
 
         // Always fail the current stage and retry all the tasks when a 
barrier task fail.
         val failedStage = stageIdToStage(task.stageId)
-        logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to 
a barrier task " +
-          "failed.")
-        val message = s"Stage failed because barrier task $task finished 
unsuccessfully.\n" +
-          failure.toErrorString
-        try {
-          // killAllTaskAttempts will fail if a SchedulerBackend does not 
implement killTask.
-          val reason = s"Task $task from barrier stage $failedStage 
(${failedStage.name}) failed."
-          taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, 
reason)
-        } catch {
-          case e: UnsupportedOperationException =>
-            // Cannot continue with barrier stage if failed to cancel zombie 
barrier tasks.
-            // TODO SPARK-24877 leave the zombie tasks and ignore their 
completion events.
-            logWarning(s"Could not kill all tasks for stage $stageId", e)
-            abortStage(failedStage, "Could not kill zombie barrier tasks for 
stage " +
-              s"$failedStage (${failedStage.name})", Some(e))
-        }
-        markStageAsFinished(failedStage, Some(message))
+        if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
+          logInfo(s"Ignoring task failure from $task as it's from $failedStage 
attempt" +
+            s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
+            s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
+        } else {
+          logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
+            "failed.")
+          val message = s"Stage failed because barrier task $task finished 
unsuccessfully.\n" +
+            failure.toErrorString
+          try {
+            // killAllTaskAttempts will fail if a SchedulerBackend does not 
implement killTask.
+            val reason = s"Task $task from barrier stage $failedStage 
(${failedStage.name}) " +
+              "failed."
+            taskScheduler.killAllTaskAttempts(stageId, interruptThread = 
false, reason)
+          } catch {
+            case e: UnsupportedOperationException =>
+              // Cannot continue with barrier stage if failed to cancel zombie 
barrier tasks.
+              // TODO SPARK-24877 leave the zombie tasks and ignore their 
completion events.
+              logWarning(s"Could not kill all tasks for stage $stageId", e)
+              abortStage(failedStage, "Could not kill zombie barrier tasks for 
stage " +
+                s"$failedStage (${failedStage.name})", Some(e))
+          }
+          markStageAsFinished(failedStage, Some(message))
 
-        failedStage.failedAttemptIds.add(task.stageAttemptId)
-        // TODO Refactor the failure handling logic to combine similar code 
with that of
-        // FetchFailed.
-        val shouldAbortStage =
-          failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
-            disallowStageRetryForTest
+          failedStage.failedAttemptIds.add(task.stageAttemptId)
+          // TODO Refactor the failure handling logic to combine similar code 
with that of
+          // FetchFailed.
+          val shouldAbortStage =
+            failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
+              disallowStageRetryForTest
 
-        if (shouldAbortStage) {
-          val abortMessage = if (disallowStageRetryForTest) {
-            "Barrier stage will not retry stage due to testing config. Most 
recent failure " +
-              s"reason: $message"
+          if (shouldAbortStage) {
+            val abortMessage = if (disallowStageRetryForTest) {
+              "Barrier stage will not retry stage due to testing config. Most 
recent failure " +
+                s"reason: $message"
+            } else {
+              s"""$failedStage (${failedStage.name})
+                 |has failed the maximum allowable number of
+                 |times: $maxConsecutiveStageAttempts.
+                 |Most recent failure reason: $message
+               """.stripMargin.replaceAll("\n", " ")
+            }
+            abortStage(failedStage, abortMessage, None)
           } else {
-            s"""$failedStage (${failedStage.name})
-               |has failed the maximum allowable number of
-               |times: $maxConsecutiveStageAttempts.
-               |Most recent failure reason: 
$message""".stripMargin.replaceAll("\n", " ")
-          }
-          abortStage(failedStage, abortMessage, None)
-        } else {
-          failedStage match {
-            case failedMapStage: ShuffleMapStage =>
-              // Mark all the map as broken in the map stage, to ensure retry 
all the tasks on
-              // resubmitted stage attempt.
-              
mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId)
-
-            case failedResultStage: ResultStage =>
-              // Mark all the partitions of the result stage to be not 
finished, to ensure retry
-              // all the tasks on resubmitted stage attempt.
-              failedResultStage.activeJob.map(_.resetAllPartitions())
-          }
+            failedStage match {
+              case failedMapStage: ShuffleMapStage =>
+                // Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+                // resubmitted stage attempt.
+                
mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId)
 
-          // update failedStages and make sure a ResubmitFailedStages event is 
enqueued
-          failedStages += failedStage
-          logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to 
barrier stage " +
-            "failure.")
-          messageScheduler.schedule(new Runnable {
-            override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-          }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+              case failedResultStage: ResultStage =>
+                // Abort the failed result stage since we may have committed 
output for some
+                // partitions.
+                val reason = "Could not recover from a failed barrier 
ResultStage. Most recent " +
+                  s"failure reason: $message"
+                abortStage(failedResultStage, reason, None)
+            }
+            // In case multiple task failures triggered for a single stage 
attempt, ensure we only
+            // resubmit the failed stage once.
+            val noResubmitEnqueued = !failedStages.contains(failedStage)
+            failedStages += failedStage
+            if (noResubmitEnqueued) {
+              logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to 
barrier stage " +
+                "failure.")
+              messageScheduler.schedule(new Runnable {
+                override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+            }
+          }
         }
 
       case Resubmitted =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 8b77641..d5e85a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -893,6 +893,10 @@ private[spark] class TaskSetManager(
         None
     }
 
+    if (tasks(index).isBarrier) {
+      isZombie = true
+    }
+
     sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, 
info)
 
     if (!isZombie && reason.countTowardsTaskFailures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6eeddbb..56ba23c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1119,6 +1119,33 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("Fail the job if a barrier ResultTask failed") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = 
mapOutputTracker)
+      .barrier()
+      .mapPartitions(iter => iter)
+    submit(reduceRdd, Array(0, 1))
+
+    // Complete the map stage.
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostA", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId) === 
Some(Seq.empty))
+
+    // The first ResultTask fails
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      TaskKilled("test"),
+      null))
+
+    // Assert the stage has been cancelled.
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(failure.getMessage.startsWith("Job aborted due to stage failure: 
Could not recover " +
+      "from a failed barrier ResultStage."))
+  }
+
   /**
    * This tests the case where another FetchFailed comes in while the map 
stage is getting
    * re-run.
@@ -2521,6 +2548,85 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     }
   }
 
+  test("Barrier task failures from the same stage attempt don't trigger 
multiple stage retries") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => 
iter)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = 
mapOutputTracker)
+    submit(reduceRdd, Array(0, 1))
+
+    val mapStageId = 0
+    def countSubmittedMapStageAttempts(): Int = {
+      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
+    }
+
+    // The map stage should have been submitted.
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(countSubmittedMapStageAttempts() === 1)
+
+    // The first map task fails with TaskKilled.
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0),
+      TaskKilled("test"),
+      null))
+    assert(sparkListener.failedStages === Seq(0))
+
+    // The second map task fails with TaskKilled.
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1),
+      TaskKilled("test"),
+      null))
+
+    // Trigger resubmission of the failed map stage.
+    runEvent(ResubmitFailedStages)
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+    // Another attempt for the map stage should have been submitted, resulting 
in 2 total attempts.
+    assert(countSubmittedMapStageAttempts() === 2)
+  }
+
+  test("Barrier task failures from a previous stage attempt don't trigger 
stage retry") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => 
iter)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = 
mapOutputTracker)
+    submit(reduceRdd, Array(0, 1))
+
+    val mapStageId = 0
+    def countSubmittedMapStageAttempts(): Int = {
+      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
+    }
+
+    // The map stage should have been submitted.
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(countSubmittedMapStageAttempts() === 1)
+
+    // The first map task fails with TaskKilled.
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0),
+      TaskKilled("test"),
+      null))
+    assert(sparkListener.failedStages === Seq(0))
+
+    // Trigger resubmission of the failed map stage.
+    runEvent(ResubmitFailedStages)
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+    // Another attempt for the map stage should have been submitted, resulting 
in 2 total attempts.
+    assert(countSubmittedMapStageAttempts() === 2)
+
+    // The second map task fails with TaskKilled.
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1),
+      TaskKilled("test"),
+      null))
+
+    // The second map task failure doesn't trigger stage retry.
+    runEvent(ResubmitFailedStages)
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(countSubmittedMapStageAttempts() === 2)
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.

http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index ca9bf08..7a457a0 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1118,4 +1118,22 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(!tsm.isZombie)
     assert(taskScheduler.taskSetManagerForAttempt(0, 0).isDefined)
   }
+
+  test("mark taskset for a barrier stage as zombie in case a task fails") {
+    val taskScheduler = setupScheduler()
+
+    val attempt = FakeTask.createBarrierTaskSet(3)
+    taskScheduler.submitTasks(attempt)
+
+    val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
+    val offers = (0 until 3).map{ idx =>
+      WorkerOffer(s"exec-$idx", s"host-$idx", 1, 
Some(s"192.168.0.101:4962$idx"))
+    }
+    taskScheduler.resourceOffers(offers)
+    assert(tsm.runningTasks === 3)
+
+    // Fail a task from the stage attempt.
+    tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, 
TaskKilled("test"))
+    assert(tsm.isZombie)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to