beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408972767
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -296,18 +296,31 @@ private[spark] class TaskSchedulerImpl(
new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
}
+ // Kill all the tasks in all the stage attempts of the same stage Id. Note
stage attempts won't
+ // be aborted but will be marked as zombie. The stage attempt will be
finished and cleaned up
+ // once all the tasks has been finished. The stage attempt could be aborted
after the call of
+ // `cancelTasks` if required.
override def cancelTasks(
stageId: Int,
interruptThread: Boolean,
reason: String): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
// Kill all running tasks for the stage.
- killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: "
+ reason)
- // Cancel all attempts for the stage.
+ logInfo(s"Killing all running tasks in stage $stageId: $reason")
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (_, tsm) =>
- tsm.abort("Stage %s cancelled".format(stageId))
- logInfo("Stage %d was cancelled".format(stageId))
+ // There are two possible cases here:
+ // 1. The task set manager has been created and some tasks have been
scheduled.
+ // In this case, send a kill signal to the executors to kill the
task.
+ // 2. The task set manager has been created but no tasks have been
scheduled. In this case,
+ // simply continue.
+ tsm.runningTasksSet.foreach { tid =>
+ taskIdToExecutorId.get(tid).foreach { execId =>
+ backend.killTask(tid, execId, interruptThread, s"Stage cancelled:
$reason")
+ }
+ }
+ tsm.suspend()
Review Comment:
I guess you expect that the `tsm` should be finished. But it may not
necessarily happen.
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
- markStageAsFinished(resultStage)
Review Comment:
Why `markStageAsFinished` no need?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]