beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1407378513
##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
// Submit a sequence of tasks to run.
def submitTasks(taskSet: TaskSet): Unit
- // Kill all the tasks in a stage and fail the stage and all the jobs that
depend on the stage.
+ // Kill all the tasks in all the stage attempts of the same stage Id
Review Comment:
Please add comment about `mark all the stage attempts as zombie`.
##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -66,10 +66,6 @@ private[spark] trait TaskScheduler {
*/
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String):
Boolean
- // Kill all the running task attempts in a stage.
- // Throw UnsupportedOperationException if the backend doesn't support kill
tasks.
- def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason:
String): Unit
Review Comment:
Does this is a break change?
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -1671,37 +1671,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)
}
- test("killAllTaskAttempts shall kill all the running tasks and not fail the
stage") {
- val taskScheduler = setupScheduler()
-
- taskScheduler.initialize(new FakeSchedulerBackend {
- override def killTask(
- taskId: Long,
- executorId: String,
- interruptThread: Boolean,
- reason: String): Unit = {
- // Since we only submit one stage attempt, the following call is
sufficient to mark the
- // task as killed.
- taskScheduler.taskSetManagerForAttempt(0,
0).get.runningTasksSet.remove(taskId)
- }
- })
-
- val attempt1 = FakeTask.createTaskSet(10)
- taskScheduler.submitTasks(attempt1)
-
- val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
- new WorkerOffer("executor1", "host1", 1))
- val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
- assert(2 === taskDescriptions.length)
- val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
- assert(2 === tsm.runningTasks)
-
- taskScheduler.killAllTaskAttempts(0, false, "test")
Review Comment:
Shall we update the test case with `taskScheduler.cancelTasks`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]