wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+    // will make activeTask in stage to be negative, this will cause stage not 
be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   > to help understand the code more, do you know when/where the scheduler 
handles resubmitted tasks?
   1. First in TaskSetManager.scala, we handle the executorLost
   ``` scala
     override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
       // Re-enqueue any tasks that ran on the failed executor if this is a 
shuffle map stage,
       // and we are not using an external shuffle server which could serve the 
shuffle outputs.
       // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
       // so we would need to rerun these tasks on other executors.
       if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
         for ((tid, info) <- taskInfos if info.executorId == execId) {
           val index = info.index
           // We may have a running task whose partition has been marked as 
successful,
           // this partition has another task completed in another stage 
attempt.
           // We treat it as a running task and will call handleFailedTask 
later.
           if (successful(index) && !info.running && 
!killedByOtherAttempt.contains(tid)) {
             successful(index) = false
             copiesRunning(index) -= 1
             tasksSuccessful -= 1
             addPendingTask(index)
             // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
             // stage finishes when a total of tasks.size tasks finish.
             sched.dagScheduler.taskEnded(
               tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
           }
         }
       }
   ```
   2. the successful task is added to queue to re-execute in `addPendingTask`
   3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle 
this, this will post a `CompletionEvent` to eventProcessLoop  
   4. the function `handleTaskCompletion` in DATScheduler.scala will handle 
this message, as the reason `Resubmitted` is kind of a failure, it will call 
the function `handleResubmittedFailure`,  which add the task partitionId to 
ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted 
task to finish again.
   ``` scala
     private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = {
       logInfo(s"Resubmitted $task, so marking it as still running.")
       stage match {
         case sms: ShuffleMapStage =>
           sms.pendingPartitions += task.partitionId
   
         case _ =>
           throw 
SparkCoreErrors.sendResubmittedTaskStatusForShuffleMapStagesOnlyError()
       }
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to