venkata91 commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r449798406
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,23 @@ private[spark] class ExecutorAllocationManager(
}
}
+ override def onUnschedulableBlacklistTaskSubmitted
+ (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted):
Unit = {
+ val stageId = blacklistedTask.stageId
+ val stageAttemptId = blacklistedTask.stageAttemptId
+ allocationManager.synchronized {
+ (stageId, stageAttemptId) match {
+ case (Some(stageId), Some(stageAttemptId)) =>
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ unschedulableTaskSets.add(stageAttempt)
+ case (None, _) =>
+ // Clear unschedulableTaskSets since atleast one task becomes
schedulable now
+ unschedulableTaskSets.clear()
Review comment:
https://github.com/apache/spark/blob/263f04db865920d9c10251517b00a1b477b58ff1/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L670
- Based on the previous implementation, we want to defer killing taskSets as
long as we have a non blacklisted executor which can be used to schedule a
task. The problem is if we can only schedule only one of the task set then the
other taskset might expire and the stage gets aborted. This is the reason I
think why they cleared the expiry in the previous implementation which is why
we are also clearing the `unschedulableTaskSets`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]