Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r228668756 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { + case Some(taskIndex) => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { + case Some ((executorId, _)) => + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + + val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout + logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") + abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + } + case _ => // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + + s" executors can be found to kill. Aborting $taskSet." ) + taskSet.abortSinceCompletelyBlacklisted(taskIndex) + } + case _ => // Do nothing if no tasks completely blacklisted. + } + } else { + // We want to defer killing any taskSets as long as we have a non blacklisted executor + // which can be used to schedule a task from any active taskSets. This ensures that the + // job can make progress and if we encounter a flawed taskSet it will eventually either + // fail or abort due to being completely blacklisted. --- End diff -- Thanks for pointing this out, but if I'm reading the discussion properly, I don't think you will actually wait indefinitely. Eventually you will either abort immediately or you should fail due to max number of task failures. Let me know if I'm missing something from the scenario. Lets say you have taskset1 that is blacklisted on all nodes (lets say we have 3). 3 cases can happen at this point: - taskset 2 hasn't started, so it tries to kill an executor and starts the timer. - taskset 2 has started, if its running on all nodes then we abort immediately because no executors to kill to kill - taskset 2 has started but its not running on all blacklisted nodes, then we will kill an executor At this point lets say we didn't abort so we killed an executor. Taskset 1 will get a chance to run on the new executor and either work or have a task failure. If it has a task failure and it gets blacklisted, we go back into the case above. But the # of task failures gets one closer. so it seems like eventually you would either abort immediately if there aren't any executors to kill or you would eventually fail with max number of task attempts.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org