mridulm commented on code in PR #47949:
URL: https://github.com/apache/spark/pull/47949#discussion_r1885435364


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -308,13 +328,59 @@ private[spark] class ExecutorAllocationManager(
       tasksPerExecutor).toInt
 
     val maxNeededWithSpeculationLocalityOffset =
-      if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
-      // If we have pending speculative tasks and only need a single executor, 
allocate one more
-      // to satisfy the locality requirements of speculation
-      maxNeeded + 1
-    } else {
-      maxNeeded
-    }
+      if (pendingSpeculative > 0 && maxNeeded <= numExecutorsTarget &&
+        executorMonitor.executorHostsCount == 1 && 
scheduler.isInstanceOf[TaskSchedulerImpl]) {
+        val now = clock.nanoTime()
+        if (excludeNodesTriggerTime == NOT_SET) {
+          excludeNodesTriggerTime = now + 
TimeUnit.MINUTES.toNanos(excludeNodeTriggerTimeoutMin)
+          logDebug(log"Current timestamp ${MDC(TIMESTAMP, now)}, " +
+            log"set excludeNodesTriggerTime to ${MDC(TIMESTAMP, 
excludeNodesTriggerTime)}")
+          maxNeeded
+        } else if (now < excludeNodesTriggerTime) {
+          maxNeeded
+        } else {
+          if (executorMonitor.hasAdjustMaxNeed) {
+            adjustMaxNeed
+          } else {
+            logDebug(log"${MDC(TIMESTAMP, now)} exceeds" +
+              log" ${MDC(TIMESTAMP, excludeNodesTriggerTime)}, start exclude 
node!")
+            val node = executorMonitor.getExecutorHostsName(0)
+            // check if current remaining host has attempts of speculative task
+            val speculativeTasksInfo = 
listener.getPendingSpeculativeTasksInfo()
+            if (scheduler.asInstanceOf[TaskSchedulerImpl].
+              speculativeTasksHasAttemptOnHost(node, speculativeTasksInfo)) {
+              // make sure new maxNeed exceeds numExecutorsTarget and allocate 
executor
+              adjustMaxNeed = numExecutorsTarget + 1
+              // If hasAdjustMaxNeed, use last adjust value as
+              // maxNeededWithSpeculationLocalityOffset in case of 
numExecutorsTarget keeps
+              // increasing during maxNumExecutorsNeededPerResourceProfile 
method called
+              if 
(scheduler.asInstanceOf[TaskSchedulerImpl].handleExcludeNodes(node)) {
+                logDebug(log"Exclude ${MDC(HOST, node)} for speculative, " +
+                  log"old maxNeeded: ${MDC(COUNT, maxNeeded)}, " +
+                  log"old numExecutorsTarget: ${MDC(COUNT, 
numExecutorsTarget)}, " +
+                  log"current executors count: ${MDC(COUNT, 
executorMonitor.executorCount)}")
+                excludeSpeculativeNodes.add(node)
+                executorMonitor.setAdjustMaxNeed(true)
+                adjustMaxNeed
+              } else {
+                logDebug(log"${MDC(HOST, node)} has been excluded for other 
reason")
+                maxNeeded
+              }
+            } else {
+              logDebug(log"No speculative task found on ${MDC(HOST, node)}")
+              maxNeeded
+            }
+          }
+        }
+      } else if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative 
> 0) {
+        resetExcludeNodesTriggerTime()
+        // If we have pending speculative tasks and only need a single 
executor, allocate one more
+        // to satisfy the locality requirements of speculation
+        maxNeeded + 1
+      } else {
+        resetExcludeNodesTriggerTime()
+        maxNeeded
+      }

Review Comment:
   The approach here appears to be that we increase executors to be allocated, 
and expect that they might end up landing on a host which can run the 
speculative task.
   For larger clusters, this approach could work - but especially for smaller 
clusters, this would result in wastage.



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -101,6 +101,7 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
  */
 private[spark] class ExecutorAllocationManager(
     client: ExecutorAllocationClient,
+    var scheduler: TaskScheduler,

Review Comment:
   In dynamic resource allocation design in spark, we have loose coupling 
between allocation manager and scheduler.
   Please drop this reference - we have to work around this issue without 
relying on scheduler internals.
   
   Instead, we can track the hosts where the speculative task should not run 
(which is the host where the current task for partition is running)



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -182,6 +190,10 @@ private[spark] class ExecutorAllocationManager(
   // ResourceProfile id to Host to possible task running on it, used for 
executor placement.
   private var rpIdToHostToLocalTaskCount: Map[Int, Map[String, Int]] = 
Map.empty
 
+  private var adjustMaxNeed = minNumExecutors;
+
+  private val excludeSpeculativeNodes = new mutable.HashSet[String]()

Review Comment:
   At any given point of time, there can be multiple stages concurrently 
running - and for each stage, different sets of executor(s)/hosts might require 
to be speculated : some might be slow for memory reasons, while others might be 
slow for IO reasons,
   Given this, we cannot assume a given executor host is bad for all stages.
   
   Instead, track this per stage attempt.
   (This applies to other fields in this class as well - the change in design 
should fix them all).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to