Repository: spark Updated Branches: refs/heads/master b8788b3e7 -> 5059255d9
[SPARK-25161][CORE] Fix several bugs in failure handling of barrier execution mode ## What changes were proposed in this pull request? Fix several bugs in failure handling of barrier execution mode: * Mark TaskSet for a barrier stage as zombie when a task attempt fails; * Multiple barrier task failures from a single barrier stage should not trigger multiple stage retries; * Barrier task failure from a previous failed stage attempt should not trigger stage retry; * Fail the job when a task from a barrier ResultStage failed; * RDD.isBarrier() should not rely on `ShuffleDependency`s. ## How was this patch tested? Added corresponding test cases in `DAGSchedulerSuite` and `TaskSchedulerImplSuite`. Closes #22158 from jiangxb1987/failure. Authored-by: Xingbo Jiang <xingbo.ji...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5059255d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5059255d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5059255d Branch: refs/heads/master Commit: 5059255d91fc7a9810e013eba39e12d30291dd08 Parents: b8788b3 Author: Xingbo Jiang <xingbo.ji...@databricks.com> Authored: Tue Aug 21 08:25:02 2018 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Tue Aug 21 08:25:02 2018 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/rdd/RDD.scala | 3 +- .../apache/spark/scheduler/DAGScheduler.scala | 125 +++++++++++-------- .../apache/spark/scheduler/TaskSetManager.scala | 4 + .../spark/scheduler/DAGSchedulerSuite.scala | 106 ++++++++++++++++ .../scheduler/TaskSchedulerImplSuite.scala | 18 +++ 5 files changed, 200 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index cbc1143..374b846 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1863,7 +1863,8 @@ abstract class RDD[T: ClassTag]( // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long // RDD chain. - @transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) + @transient protected lazy val isBarrier_ : Boolean = + dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) } http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2b0ca13..6787250 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1478,9 +1478,11 @@ private[spark] class DAGScheduler( mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId) case failedResultStage: ResultStage => - // Mark all the partitions of the result stage to be not finished, to ensure retry - // all the tasks on resubmitted stage attempt. - failedResultStage.activeJob.map(_.resetAllPartitions()) + // Abort the failed result stage since we may have committed output for some + // partitions. + val reason = "Could not recover from a failed barrier ResultStage. Most recent " + + s"failure reason: $failureMessage" + abortStage(failedResultStage, reason, None) } } @@ -1553,62 +1555,75 @@ private[spark] class DAGScheduler( // Always fail the current stage and retry all the tasks when a barrier task fail. val failedStage = stageIdToStage(task.stageId) - logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + - "failed.") - val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" + - failure.toErrorString - try { - // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask. - val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) failed." - taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason) - } catch { - case e: UnsupportedOperationException => - // Cannot continue with barrier stage if failed to cancel zombie barrier tasks. - // TODO SPARK-24877 leave the zombie tasks and ignore their completion events. - logWarning(s"Could not kill all tasks for stage $stageId", e) - abortStage(failedStage, "Could not kill zombie barrier tasks for stage " + - s"$failedStage (${failedStage.name})", Some(e)) - } - markStageAsFinished(failedStage, Some(message)) + if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) { + logInfo(s"Ignoring task failure from $task as it's from $failedStage attempt" + + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") + } else { + logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + + "failed.") + val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" + + failure.toErrorString + try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask. + val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " + + "failed." + taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason) + } catch { + case e: UnsupportedOperationException => + // Cannot continue with barrier stage if failed to cancel zombie barrier tasks. + // TODO SPARK-24877 leave the zombie tasks and ignore their completion events. + logWarning(s"Could not kill all tasks for stage $stageId", e) + abortStage(failedStage, "Could not kill zombie barrier tasks for stage " + + s"$failedStage (${failedStage.name})", Some(e)) + } + markStageAsFinished(failedStage, Some(message)) - failedStage.failedAttemptIds.add(task.stageAttemptId) - // TODO Refactor the failure handling logic to combine similar code with that of - // FetchFailed. - val shouldAbortStage = - failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + failedStage.failedAttemptIds.add(task.stageAttemptId) + // TODO Refactor the failure handling logic to combine similar code with that of + // FetchFailed. + val shouldAbortStage = + failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || + disallowStageRetryForTest - if (shouldAbortStage) { - val abortMessage = if (disallowStageRetryForTest) { - "Barrier stage will not retry stage due to testing config. Most recent failure " + - s"reason: $message" + if (shouldAbortStage) { + val abortMessage = if (disallowStageRetryForTest) { + "Barrier stage will not retry stage due to testing config. Most recent failure " + + s"reason: $message" + } else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: $maxConsecutiveStageAttempts. + |Most recent failure reason: $message + """.stripMargin.replaceAll("\n", " ") + } + abortStage(failedStage, abortMessage, None) } else { - s"""$failedStage (${failedStage.name}) - |has failed the maximum allowable number of - |times: $maxConsecutiveStageAttempts. - |Most recent failure reason: $message""".stripMargin.replaceAll("\n", " ") - } - abortStage(failedStage, abortMessage, None) - } else { - failedStage match { - case failedMapStage: ShuffleMapStage => - // Mark all the map as broken in the map stage, to ensure retry all the tasks on - // resubmitted stage attempt. - mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId) - - case failedResultStage: ResultStage => - // Mark all the partitions of the result stage to be not finished, to ensure retry - // all the tasks on resubmitted stage attempt. - failedResultStage.activeJob.map(_.resetAllPartitions()) - } + failedStage match { + case failedMapStage: ShuffleMapStage => + // Mark all the map as broken in the map stage, to ensure retry all the tasks on + // resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId) - // update failedStages and make sure a ResubmitFailedStages event is enqueued - failedStages += failedStage - logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to barrier stage " + - "failure.") - messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + case failedResultStage: ResultStage => + // Abort the failed result stage since we may have committed output for some + // partitions. + val reason = "Could not recover from a failed barrier ResultStage. Most recent " + + s"failure reason: $message" + abortStage(failedResultStage, reason, None) + } + // In case multiple task failures triggered for a single stage attempt, ensure we only + // resubmit the failed stage once. + val noResubmitEnqueued = !failedStages.contains(failedStage) + failedStages += failedStage + if (noResubmitEnqueued) { + logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to barrier stage " + + "failure.") + messageScheduler.schedule(new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } + } } case Resubmitted => http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8b77641..d5e85a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -893,6 +893,10 @@ private[spark] class TaskSetManager( None } + if (tasks(index).isBarrier) { + isZombie = true + } + sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) if (!isZombie && reason.countTowardsTaskFailures) { http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6eeddbb..56ba23c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1119,6 +1119,33 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("Fail the job if a barrier ResultTask failed") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + .barrier() + .mapPartitions(iter => iter) + submit(reduceRdd, Array(0, 1)) + + // Complete the map stage. + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2)))) + assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq.empty)) + + // The first ResultTask fails + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + TaskKilled("test"), + null)) + + // Assert the stage has been cancelled. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(failure.getMessage.startsWith("Job aborted due to stage failure: Could not recover " + + "from a failed barrier ResultStage.")) + } + /** * This tests the case where another FetchFailed comes in while the map stage is getting * re-run. @@ -2521,6 +2548,85 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + test("Barrier task failures from the same stage attempt don't trigger multiple stage retries") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => iter) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + + val mapStageId = 0 + def countSubmittedMapStageAttempts(): Int = { + sparkListener.submittedStageInfos.count(_.stageId == mapStageId) + } + + // The map stage should have been submitted. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(countSubmittedMapStageAttempts() === 1) + + // The first map task fails with TaskKilled. + runEvent(makeCompletionEvent( + taskSets(0).tasks(0), + TaskKilled("test"), + null)) + assert(sparkListener.failedStages === Seq(0)) + + // The second map task fails with TaskKilled. + runEvent(makeCompletionEvent( + taskSets(0).tasks(1), + TaskKilled("test"), + null)) + + // Trigger resubmission of the failed map stage. + runEvent(ResubmitFailedStages) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. + assert(countSubmittedMapStageAttempts() === 2) + } + + test("Barrier task failures from a previous stage attempt don't trigger stage retry") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => iter) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + + val mapStageId = 0 + def countSubmittedMapStageAttempts(): Int = { + sparkListener.submittedStageInfos.count(_.stageId == mapStageId) + } + + // The map stage should have been submitted. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(countSubmittedMapStageAttempts() === 1) + + // The first map task fails with TaskKilled. + runEvent(makeCompletionEvent( + taskSets(0).tasks(0), + TaskKilled("test"), + null)) + assert(sparkListener.failedStages === Seq(0)) + + // Trigger resubmission of the failed map stage. + runEvent(ResubmitFailedStages) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. + assert(countSubmittedMapStageAttempts() === 2) + + // The second map task fails with TaskKilled. + runEvent(makeCompletionEvent( + taskSets(0).tasks(1), + TaskKilled("test"), + null)) + + // The second map task failure doesn't trigger stage retry. + runEvent(ResubmitFailedStages) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(countSubmittedMapStageAttempts() === 2) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index ca9bf08..7a457a0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1118,4 +1118,22 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!tsm.isZombie) assert(taskScheduler.taskSetManagerForAttempt(0, 0).isDefined) } + + test("mark taskset for a barrier stage as zombie in case a task fails") { + val taskScheduler = setupScheduler() + + val attempt = FakeTask.createBarrierTaskSet(3) + taskScheduler.submitTasks(attempt) + + val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get + val offers = (0 until 3).map{ idx => + WorkerOffer(s"exec-$idx", s"host-$idx", 1, Some(s"192.168.0.101:4962$idx")) + } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 3) + + // Fail a task from the stage attempt. + tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test")) + assert(tsm.isZombie) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org