Ngone51 commented on code in PR #49270:
URL: https://github.com/apache/spark/pull/49270#discussion_r1897684898
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2937,7 +2938,9 @@ private[spark] class DAGScheduler(
} else {
// This stage is only used by the job, so finish the stage if it is
running.
val stage = stageIdToStage(stageId)
- if (runningStages.contains(stage)) {
+ val shouldKill = runningStages.contains(stage) ||
+ (waitingStages.contains(stage) && stage.resubmitInFetchFailed)
Review Comment:
> Only considering failedAttemptIds may result in repeated calls to the the
stage already completed and failed.
It looks like there could be a case where the stage exists in `failedStages`
but not in `waitingStages`, e.g., in the case of fetch failures, map stage and
reduce stage can be added into `failedStages`, but the related job could be
canceled before they were resubmitted. So adding
`waitingStages.contains(stage)` would miss the stages in `failedStages`. And I
don't think we would have repeated calls as we don't kill tasks for those
failed stages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]