Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/686#discussion_r14040933
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1062,10 +1062,15 @@ class DAGScheduler(
// This is the only job that uses this stage, so fail the stage
if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
- taskScheduler.cancelTasks(stageId, shouldInterruptThread)
- val stageInfo = stageToInfos(stage)
- stageInfo.stageFailed(failureReason)
-
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+ try { // cancelTasks will fail if a SchedulerBackend does not
implement killTask
+ taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+ val stageInfo = stageToInfos(stage)
+ stageInfo.stageFailed(failureReason)
+
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+ } catch {
+ case e: UnsupportedOperationException =>
+ logInfo(s"Could not cancel tasks for stage $stageId", e)
+ }
--- End diff --
Ah you're right that it doesn't make sense to add that here because it will
be called for each stage. My intention was that if the job has running stages
that don't get cancelled (because the task scheduler doesn't implement
cancelTasks()), then we should not call job.listener.jobFailed() -- do you
think that makes sense? Seems like the way to implement that would be to set a
boolean flag here if the job can't be successfully cancelled, and then call
jobFailed() 0 or 1 times at the end of this function depending on that flag.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---