Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1402869071


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2179,12 +2164,12 @@ private[spark] class DAGScheduler(
           val message = s"Stage failed because barrier task $task finished 
unsuccessfully.\n" +
             failure.toErrorString
           try {
-            // killAllTaskAttempts will fail if a SchedulerBackend does not 
implement killTask.
+            // cancelTasks will fail if a SchedulerBackend does not implement 
killTask.
             val reason = s"Task $task from barrier stage $failedStage 
(${failedStage.name}) " +
               "failed."
             val job = jobIdToActiveJob.get(failedStage.firstJobId)
             val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
-            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)

Review Comment:
   This's really a good point. `taskSetFailed` will abort the stage and in turn 
fails the whole job, which is not the intended  behaviour here. The problem of 
`killAllTaskAttempts` is that it doesn't mark the `TaskSetManager` as zombie 
after killing all the tasks. So the `TaskSetManager` could still launch new 
tasks (by retry), which is not expected.
   
   But I'm also thinking do we really want to abort the stages in 
`cancelTasks`? `cancelTasks` is currently called inside 
`cancelRunningIndependentStages` only. And `cancelRunningIndependentStages` is 
directly or indirectly called in 3 cases:
   
   * When a job successfully finished: in this case, we expect that all the 
stages in this job can release the computation resources (i.e., kill all the 
tasks via `cleanupStateForJobAndIndependentStages`) immediately. But I think we 
don't expect this "release" action would lead to the stage abortion and in turn 
fail the job in the end. It doesn't fail the already succeeded job today 
because the succeeded job has been clean up (no longer exists in the 
`activeJobs` list) when the `taskSetFailed` event comes.
    
   * When a job is requested to cancel: this case is essentially the same with 
the above case but only the job finishes in different states.
   
   * When a stage aborts: in this case, we expect all the active jobs which 
depends on this stage to be canceled. Thus, we need to call 
`cancelRunningIndependentStages` on each active job. And this would finally 
fallback to the first case as the active job will be cleaned up (via 
`cleanupStateForJobAndIndependentStages`) first before the  `taskSetFailed` 
event comes.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to