This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 38263f6  [SPARK-27630][CORE] Properly handle task end events from 
completed stages
38263f6 is described below

commit 38263f6d153944b6f2f0248d9284861fc82532d6
Author: sychen <syc...@ctrip.com>
AuthorDate: Tue Jun 25 14:30:13 2019 -0500

    [SPARK-27630][CORE] Properly handle task end events from completed stages
    
    ## What changes were proposed in this pull request?
    Track tasks separately for each stage attempt (instead of tracking by 
stage), and do NOT reset the numRunningTasks to 0 on StageCompleted.
    
    In the case of stage retry, the `taskEnd` event from the zombie stage 
sometimes makes the number of `totalRunningTasks` negative, which will causes 
the job to get stuck.
    Similar problem also exists with `stageIdToTaskIndices` & 
`stageIdToSpeculativeTaskIndices`.
    If it is a failed `taskEnd` event of the zombie stage, this will cause 
`stageIdToTaskIndices` or `stageIdToSpeculativeTaskIndices` to remove the task 
index of the active stage, and the number of `totalPendingTasks` will increase 
unexpectedly.
    ## How was this patch tested?
    unit test properly handle task end events from completed stages
    
    Closes #24497 from cxzl25/fix_stuck_job_follow_up.
    
    Authored-by: sychen <syc...@ctrip.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   | 113 ++++++++++++---------
 .../org/apache/spark/scheduler/DAGScheduler.scala  |   2 +-
 .../org/apache/spark/scheduler/SparkListener.scala |   5 +-
 .../spark/ExecutorAllocationManagerSuite.scala     |  33 +++++-
 project/MimaExcludes.scala                         |   6 ++
 5 files changed, 104 insertions(+), 55 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bb95fea..bceb26c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -491,6 +491,10 @@ private[spark] class ExecutorAllocationManager(
     numExecutorsToAdd = 1
   }
 
+  private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
+    override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
+  }
+
   /**
    * A listener that notifies the given allocation manager of when to add and 
remove executors.
    *
@@ -499,29 +503,32 @@ private[spark] class ExecutorAllocationManager(
    */
   private[spark] class ExecutorAllocationListener extends SparkListener {
 
-    private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
-    // Number of running tasks per stage including speculative tasks.
+    private val stageAttemptToNumTasks = new mutable.HashMap[StageAttempt, Int]
+    // Number of running tasks per stageAttempt including speculative tasks.
     // Should be 0 when no stages are active.
-    private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
-    private val stageIdToTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
-    // Number of speculative tasks to be scheduled in each stage
-    private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
-    // The speculative tasks started in each stage
-    private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
-
-    // stageId to tuple (the number of task with locality preferences, a map 
where each pair is a
-    // node and the number of tasks that would like to be scheduled on that 
node) map,
-    // maintain the executor placement hints for each stage Id used by 
resource framework to better
-    // place the executors.
-    private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
+    private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
+    private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
+    // Number of speculative tasks to be scheduled in each stageAttempt
+    private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
+    // The speculative tasks started in each stageAttempt
+    private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
+
+    // stageAttempt to tuple (the number of task with locality preferences, a 
map where each pair
+    // is a node and the number of tasks that would like to be scheduled on 
that node) map,
+    // maintain the executor placement hints for each stageAttempt used by 
resource framework
+    // to better place the executors.
+    private val stageAttemptToExecutorPlacementHints =
+      new mutable.HashMap[StageAttempt, (Int, Map[String, Int])]
 
     override def onStageSubmitted(stageSubmitted: 
SparkListenerStageSubmitted): Unit = {
       initializing = false
       val stageId = stageSubmitted.stageInfo.stageId
+      val stageAttemptId = stageSubmitted.stageInfo.attemptNumber()
+      val stageAttempt = StageAttempt(stageId, stageAttemptId)
       val numTasks = stageSubmitted.stageInfo.numTasks
       allocationManager.synchronized {
-        stageIdToNumTasks(stageId) = numTasks
-        stageIdToNumRunningTask(stageId) = 0
+        stageAttemptToNumTasks(stageAttempt) = numTasks
         allocationManager.onSchedulerBacklogged()
 
         // Compute the number of tasks requested by the stage on each host
@@ -536,7 +543,7 @@ private[spark] class ExecutorAllocationManager(
             }
           }
         }
-        stageIdToExecutorPlacementHints.put(stageId,
+        stageAttemptToExecutorPlacementHints.put(stageAttempt,
           (numTasksPending, hostToLocalTaskCountPerStage.toMap))
 
         // Update the executor placement hints
@@ -546,20 +553,24 @@ private[spark] class ExecutorAllocationManager(
 
     override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
       val stageId = stageCompleted.stageInfo.stageId
+      val stageAttemptId = stageCompleted.stageInfo.attemptNumber()
+      val stageAttempt = StageAttempt(stageId, stageAttemptId)
       allocationManager.synchronized {
-        stageIdToNumTasks -= stageId
-        stageIdToNumRunningTask -= stageId
-        stageIdToNumSpeculativeTasks -= stageId
-        stageIdToTaskIndices -= stageId
-        stageIdToSpeculativeTaskIndices -= stageId
-        stageIdToExecutorPlacementHints -= stageId
+        // do NOT remove stageAttempt from stageAttemptToNumRunningTasks,
+        // because the attempt may still have running tasks,
+        // even after another attempt for the stage is submitted.
+        stageAttemptToNumTasks -= stageAttempt
+        stageAttemptToNumSpeculativeTasks -= stageAttempt
+        stageAttemptToTaskIndices -= stageAttempt
+        stageAttemptToSpeculativeTaskIndices -= stageAttempt
+        stageAttemptToExecutorPlacementHints -= stageAttempt
 
         // Update the executor placement hints
         updateExecutorPlacementHints()
 
         // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) 
{
+        if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToNumSpeculativeTasks.isEmpty) {
           allocationManager.onSchedulerQueueEmpty()
         }
       }
@@ -567,19 +578,19 @@ private[spark] class ExecutorAllocationManager(
 
     override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
       val stageId = taskStart.stageId
+      val stageAttemptId = taskStart.stageAttemptId
+      val stageAttempt = StageAttempt(stageId, stageAttemptId)
       val taskIndex = taskStart.taskInfo.index
-
       allocationManager.synchronized {
-        if (stageIdToNumRunningTask.contains(stageId)) {
-          stageIdToNumRunningTask(stageId) += 1
-        }
-
+        stageAttemptToNumRunningTask(stageAttempt) =
+          stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new 
mutable.HashSet[Int]) +=
-            taskIndex
+          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
+            new mutable.HashSet[Int]) += taskIndex
         } else {
-          stageIdToTaskIndices.getOrElseUpdate(stageId, new 
mutable.HashSet[Int]) += taskIndex
+          stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt,
+            new mutable.HashSet[Int]) += taskIndex
         }
         if (totalPendingTasks() == 0) {
           allocationManager.onSchedulerQueueEmpty()
@@ -588,13 +599,17 @@ private[spark] class ExecutorAllocationManager(
     }
 
     override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
-      val taskIndex = taskEnd.taskInfo.index
       val stageId = taskEnd.stageId
+      val stageAttemptId = taskEnd.stageAttemptId
+      val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      val taskIndex = taskEnd.taskInfo.index
       allocationManager.synchronized {
-        if (stageIdToNumRunningTask.contains(stageId)) {
-          stageIdToNumRunningTask(stageId) -= 1
+        if (stageAttemptToNumRunningTask.contains(stageAttempt)) {
+          stageAttemptToNumRunningTask(stageAttempt) -= 1
+          if (stageAttemptToNumRunningTask(stageAttempt) == 0) {
+            stageAttemptToNumRunningTask -= stageAttempt
+          }
         }
-
         // If the task failed, we expect it to be resubmitted later. To ensure 
we have
         // enough resources to run the resubmitted task, we need to mark the 
scheduler
         // as backlogged again if it's not already marked as such (SPARK-8366)
@@ -603,9 +618,9 @@ private[spark] class ExecutorAllocationManager(
             allocationManager.onSchedulerBacklogged()
           }
           if (taskEnd.taskInfo.speculative) {
-            stageIdToSpeculativeTaskIndices.get(stageId).foreach 
{_.remove(taskIndex)}
+            stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove(taskIndex)}
           } else {
-            stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
+            stageAttemptToTaskIndices.get(stageAttempt).foreach 
{_.remove(taskIndex)}
           }
         }
       }
@@ -613,11 +628,12 @@ private[spark] class ExecutorAllocationManager(
 
     override def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted)
       : Unit = {
-       val stageId = speculativeTask.stageId
-
+      val stageId = speculativeTask.stageId
+      val stageAttemptId = speculativeTask.stageAttemptId
+      val stageAttempt = StageAttempt(stageId, stageAttemptId)
       allocationManager.synchronized {
-        stageIdToNumSpeculativeTasks(stageId) =
-          stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+        stageAttemptToNumSpeculativeTasks(stageAttempt) =
+          stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
         allocationManager.onSchedulerBacklogged()
       }
     }
@@ -629,14 +645,14 @@ private[spark] class ExecutorAllocationManager(
      * Note: This is not thread-safe without the caller owning the 
`allocationManager` lock.
      */
     def pendingTasks(): Int = {
-      stageIdToNumTasks.map { case (stageId, numTasks) =>
-        numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
+      stageAttemptToNumTasks.map { case (stageAttempt, numTasks) =>
+        numTasks - 
stageAttemptToTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
       }.sum
     }
 
     def pendingSpeculativeTasks(): Int = {
-      stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) =>
-        numTasks - 
stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0)
+      stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) =>
+        numTasks - 
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
       }.sum
     }
 
@@ -646,9 +662,10 @@ private[spark] class ExecutorAllocationManager(
 
     /**
      * The number of tasks currently running across all stages.
+     * Include running-but-zombie stage attempts
      */
     def totalRunningTasks(): Int = {
-      stageIdToNumRunningTask.values.sum
+      stageAttemptToNumRunningTask.values.sum
     }
 
     /**
@@ -662,7 +679,7 @@ private[spark] class ExecutorAllocationManager(
     def updateExecutorPlacementHints(): Unit = {
       var localityAwareTasks = 0
       val localityToCount = new mutable.HashMap[String, Int]()
-      stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, 
localities) =>
+      stageAttemptToExecutorPlacementHints.values.foreach { case 
(numTasksPending, localities) =>
         localityAwareTasks += numTasksPending
         localities.foreach { case (hostname, count) =>
           val updatedCount = localityToCount.getOrElse(hostname, 0) + count
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index de57807..5072e61 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -933,7 +933,7 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{
-    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
+    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
   }
 
   private[scheduler] def handleTaskSetFailed(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 1acfd90..666ce3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -52,7 +52,10 @@ case class SparkListenerTaskStart(stageId: Int, 
stageAttemptId: Int, taskInfo: T
 case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends 
SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends 
SparkListenerEvent
+case class SparkListenerSpeculativeTaskSubmitted(
+    stageId: Int,
+    stageAttemptId: Int = 0)
+  extends SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerTaskEnd(
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 2b75f2e..3ba33e3 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -254,14 +254,19 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a speculative task doesn't affect the target
-    post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", 
true)))
+    post(SparkListenerTaskStart(1, 0, createTaskInfo(1, 0, "executor-2", 
true)))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
   }
 
-  test("ignore task end events from completed stages") {
+  test("properly handle task end events from completed stages") {
     val manager = createManager(createConf(0, 10, 0))
+
+    // We simulate having a stage fail, but with tasks still running.  Then 
another attempt for
+    // that stage is started, and we get task completions from the first stage 
attempt.  Make sure
+    // the value of `totalTasksRunning` is consistent as tasks finish from 
both attempts (we count
+    // all running tasks, from the zombie & non-zombie attempts)
     val stage = createStageInfo(0, 5)
     post(SparkListenerStageSubmitted(stage))
     val taskInfo1 = createTaskInfo(0, 0, "executor-1")
@@ -269,10 +274,27 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
     post(SparkListenerTaskStart(0, 0, taskInfo1))
     post(SparkListenerTaskStart(0, 0, taskInfo2))
 
+    // The tasks in the zombie attempt haven't completed yet, so we still 
count them
     post(SparkListenerStageCompleted(stage))
 
+    // There are still two tasks that belong to the zombie stage running.
+    assert(totalRunningTasks(manager) === 2)
+
+    // submit another attempt for the stage.  We count completions from the 
first zombie attempt
+    val stageAttempt1 = createStageInfo(stage.stageId, 5, attemptId = 1)
+    post(SparkListenerStageSubmitted(stageAttempt1))
     post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null))
-    post(SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null))
+    assert(totalRunningTasks(manager) === 1)
+    val attemptTaskInfo1 = createTaskInfo(3, 0, "executor-1")
+    val attemptTaskInfo2 = createTaskInfo(4, 1, "executor-1")
+    post(SparkListenerTaskStart(0, 1, attemptTaskInfo1))
+    post(SparkListenerTaskStart(0, 1, attemptTaskInfo2))
+    assert(totalRunningTasks(manager) === 3)
+    post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo1, null))
+    assert(totalRunningTasks(manager) === 2)
+    post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo2, null))
+    assert(totalRunningTasks(manager) === 1)
+    post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo2, null))
     assert(totalRunningTasks(manager) === 0)
   }
 
@@ -1033,9 +1055,10 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
   private def createStageInfo(
       stageId: Int,
       numTasks: Int,
-      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
+      attemptId: Int = 0
     ): StageInfo = {
-    new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no 
details",
+    new StageInfo(stageId, attemptId, "name", numTasks, Seq.empty, Seq.empty, 
"no details",
       taskLocalityPreferences = taskLocalityPreferences)
   }
 
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 38ec5a0..cb3b803 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -79,6 +79,12 @@ object MimaExcludes {
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"),
     
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"),
     
+    // [SPARK-27630][CORE] Properly handle task end events from completed 
stages
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.apply"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.copy"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.this"),
+    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted$"),
+    
     // [SPARK-26632][Core] Separate Thread Configurations of Driver and 
Executor
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf"),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to