Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1402869071
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2179,12 +2164,12 @@ private[spark] class DAGScheduler(
val message = s"Stage failed because barrier task $task finished
unsuccessfully.\n" +
failure.toErrorString
try {
- // killAllTaskAttempts will fail if a SchedulerBackend does not
implement killTask.
+ // cancelTasks will fail if a SchedulerBackend does not implement
killTask.
val reason = s"Task $task from barrier stage $failedStage
(${failedStage.name}) " +
"failed."
val job = jobIdToActiveJob.get(failedStage.firstJobId)
val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
- taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)
Review Comment:
This's really a good point. `taskSetFailed` will abort the stage and in turn
fails the whole job, which is not the intended behaviour here. The problem of
`killAllTaskAttempts` is that it doesn't mark the `TaskSetManager` as zombie
after killing all the tasks. So the `TaskSetManager` could still launch new
tasks (by retry), which is not expected.
But I'm also thinking do we really want to abort the stages in
`cancelTasks`? `cancelTasks` is currently called inside
`cancelRunningIndependentStages` only. And `cancelRunningIndependentStages` is
directly or indirectly called in 3 cases:
* When a job successfully finished: in this case, we expect that all the
stages in this job can release the computation resources (i.e., kill all the
tasks via `cleanupStateForJobAndIndependentStages`) immediately. But I think we
don't expect this "release" action would lead to the stage abortion and in turn
fail the job in the end. It doesn't fail the already succeeded job today
because the succeeded job has been clean up (no longer exists in the
`activeJobs` list) when the `taskSetFailed` event comes.
* When a job is requested to cancel: this case is essentially the same with
the above case but only the job finishes in different states.
* When a stage aborts: in this case, we expect all the active jobs which
depends on this stage to be canceled. Thus, we need to call
`cancelRunningIndependentStages` on each active job. And this would finally
fallback to the first case as the active job will be cleaned up (via
`cleanupStateForJobAndIndependentStages`) first before the `taskSetFailed`
event comes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]