linzebing commented on a change in pull request #27223: [SPARK-30511][CORE] 
Spark marks intentionally killed speculative tasks as pending leads to holding 
idle executors
URL: https://github.com/apache/spark/pull/27223#discussion_r368204585
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 ##########
 @@ -614,18 +614,24 @@ private[spark] class ExecutorAllocationManager(
             stageAttemptToNumRunningTask -= stageAttempt
           }
         }
-        // If the task failed, we expect it to be resubmitted later. To ensure 
we have
-        // enough resources to run the resubmitted task, we need to mark the 
scheduler
-        // as backlogged again if it's not already marked as such (SPARK-8366)
-        if (taskEnd.reason != Success) {
-          if (totalPendingTasks() == 0) {
-            allocationManager.onSchedulerBacklogged()
-          }
-          if (taskEnd.taskInfo.speculative) {
-            stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove(taskIndex)}
-          } else {
-            stageAttemptToTaskIndices.get(stageAttempt).foreach 
{_.remove(taskIndex)}
-          }
+
+        if (taskEnd.taskInfo.speculative) {
+          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove{taskIndex}}
+          stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
+        }
+
+        // If the task failed (not intentionally killed), we expect it to be 
resubmitted later. To
+        // ensure we have enough resources to run the resubmitted task, we 
need to mark the
+        // scheduler as backlogged again if it's not already marked as such 
(SPARK-8366)
+        taskEnd.reason match {
+          case Success | _: TaskKilled =>
 
 Review comment:
   Inside the the brackets, there are two things:
   ```
   if (totalPendingTasks() == 0) {
       allocationManager.onSchedulerBacklogged()
   }
   ```
   This one is straightforward. If a task is intentionally killed, then we 
don't expect this task to be resubmitted again, and we don't need to mark the 
scheduler as backlogged.
   ```
   if (!taskEnd.taskInfo.speculative) {
       stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
   }
   ```
   If a non-speculative task is intentionally killed, it means the speculative 
task has succeeded, and no further task of this task index will be resubmitted. 
In this case, the task index is completed and we shouldn't remove it from 
`stageAttemptToTaskIndices`. Otherwise, we will have a pending non-speculative 
task for the task index.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to