mridulm commented on code in PR #47949:
URL: https://github.com/apache/spark/pull/47949#discussion_r1885435364
##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -308,13 +328,59 @@ private[spark] class ExecutorAllocationManager(
tasksPerExecutor).toInt
val maxNeededWithSpeculationLocalityOffset =
- if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
- // If we have pending speculative tasks and only need a single executor,
allocate one more
- // to satisfy the locality requirements of speculation
- maxNeeded + 1
- } else {
- maxNeeded
- }
+ if (pendingSpeculative > 0 && maxNeeded <= numExecutorsTarget &&
+ executorMonitor.executorHostsCount == 1 &&
scheduler.isInstanceOf[TaskSchedulerImpl]) {
+ val now = clock.nanoTime()
+ if (excludeNodesTriggerTime == NOT_SET) {
+ excludeNodesTriggerTime = now +
TimeUnit.MINUTES.toNanos(excludeNodeTriggerTimeoutMin)
+ logDebug(log"Current timestamp ${MDC(TIMESTAMP, now)}, " +
+ log"set excludeNodesTriggerTime to ${MDC(TIMESTAMP,
excludeNodesTriggerTime)}")
+ maxNeeded
+ } else if (now < excludeNodesTriggerTime) {
+ maxNeeded
+ } else {
+ if (executorMonitor.hasAdjustMaxNeed) {
+ adjustMaxNeed
+ } else {
+ logDebug(log"${MDC(TIMESTAMP, now)} exceeds" +
+ log" ${MDC(TIMESTAMP, excludeNodesTriggerTime)}, start exclude
node!")
+ val node = executorMonitor.getExecutorHostsName(0)
+ // check if current remaining host has attempts of speculative task
+ val speculativeTasksInfo =
listener.getPendingSpeculativeTasksInfo()
+ if (scheduler.asInstanceOf[TaskSchedulerImpl].
+ speculativeTasksHasAttemptOnHost(node, speculativeTasksInfo)) {
+ // make sure new maxNeed exceeds numExecutorsTarget and allocate
executor
+ adjustMaxNeed = numExecutorsTarget + 1
+ // If hasAdjustMaxNeed, use last adjust value as
+ // maxNeededWithSpeculationLocalityOffset in case of
numExecutorsTarget keeps
+ // increasing during maxNumExecutorsNeededPerResourceProfile
method called
+ if
(scheduler.asInstanceOf[TaskSchedulerImpl].handleExcludeNodes(node)) {
+ logDebug(log"Exclude ${MDC(HOST, node)} for speculative, " +
+ log"old maxNeeded: ${MDC(COUNT, maxNeeded)}, " +
+ log"old numExecutorsTarget: ${MDC(COUNT,
numExecutorsTarget)}, " +
+ log"current executors count: ${MDC(COUNT,
executorMonitor.executorCount)}")
+ excludeSpeculativeNodes.add(node)
+ executorMonitor.setAdjustMaxNeed(true)
+ adjustMaxNeed
+ } else {
+ logDebug(log"${MDC(HOST, node)} has been excluded for other
reason")
+ maxNeeded
+ }
+ } else {
+ logDebug(log"No speculative task found on ${MDC(HOST, node)}")
+ maxNeeded
+ }
+ }
+ }
+ } else if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative
> 0) {
+ resetExcludeNodesTriggerTime()
+ // If we have pending speculative tasks and only need a single
executor, allocate one more
+ // to satisfy the locality requirements of speculation
+ maxNeeded + 1
+ } else {
+ resetExcludeNodesTriggerTime()
+ maxNeeded
+ }
Review Comment:
The approach here appears to be that we increase executors to be allocated,
and expect that they might end up landing on a host which can run the
speculative task.
For larger clusters, this approach could work - but especially for smaller
clusters, this would result in wastage.
##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -101,6 +101,7 @@ import org.apache.spark.util.{Clock, SystemClock,
ThreadUtils, Utils}
*/
private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
+ var scheduler: TaskScheduler,
Review Comment:
In dynamic resource allocation design in spark, we have loose coupling
between allocation manager and scheduler.
Please drop this reference - we have to work around this issue without
relying on scheduler internals.
Instead, we can track the hosts where the speculative task should not run
(which is the host where the current task for partition is running)
##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -182,6 +190,10 @@ private[spark] class ExecutorAllocationManager(
// ResourceProfile id to Host to possible task running on it, used for
executor placement.
private var rpIdToHostToLocalTaskCount: Map[Int, Map[String, Int]] =
Map.empty
+ private var adjustMaxNeed = minNumExecutors;
+
+ private val excludeSpeculativeNodes = new mutable.HashSet[String]()
Review Comment:
At any given point of time, there can be multiple stages concurrently
running - and for each stage, different sets of executor(s)/hosts might require
to be speculated : some might be slow for memory reasons, while others might be
slow for IO reasons,
Given this, we cannot assume a given executor host is bad for all stages.
Instead, track this per stage attempt.
(This applies to other fields in this class as well - the change in design
should fix them all).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]