venkata91 commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r455205236
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -289,13 +290,27 @@ private[spark] class ExecutorAllocationManager(
s" tasksperexecutor: $tasksPerExecutor")
val maxNeeded = math.ceil(numRunningOrPendingTasks *
executorAllocationRatio /
tasksPerExecutor).toInt
- if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+
+ val maxNeededWithSpeculationLocalityOffset =
+ if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
// If we have pending speculative tasks and only need a single executor,
allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}
+
+ if (unschedulableTaskSets > 0) {
+ // Request additional executors only if
maxNeededWithSpeculationLocalityOffset is less than
Review comment:
Thanks for the suggestion. This is much better.
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}
+ // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes
the first unschedulable
+ // task found due to blacklisting. This way we only need to keep track of
the unschedulable
+ // tasksets which is an indirect way to get the current number of
unschedulable tasks.
Review comment:
Thanks for the suggestion!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]