tgravescs commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r455153176



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,28 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableTaskSetAdded(
+        unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): 
Unit = {
+      val stageId = unschedulableTaskSetAdded.stageId
+      val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+      allocationManager.synchronized {
+        val stageAttempt = StageAttempt(stageId, stageAttemptId)
+        unschedulableTaskSets.add(stageAttempt)
+      }
+      allocationManager.onSchedulerBacklogged()
+    }
+
+    override def onUnschedulableTaskSetRemoved(
+        unschedulableTaskSetRemoved: 
SparkListenerUnschedulableTaskSetRemoved): Unit = {
+      val stageId = unschedulableTaskSetRemoved.stageId
+      val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId
+      allocationManager.synchronized {
+        // Clear unschedulableTaskSets since atleast one task becomes 
schedulable now
+        val stageAttempt = StageAttempt(stageId, stageAttemptId)

Review comment:
       similar move stageAttempt outside of synchronized

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -289,13 +290,27 @@ private[spark] class ExecutorAllocationManager(
       s" tasksperexecutor: $tasksPerExecutor")
     val maxNeeded = math.ceil(numRunningOrPendingTasks * 
executorAllocationRatio /
       tasksPerExecutor).toInt
-    if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+
+    val maxNeededWithSpeculationLocalityOffset =
+      if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
       // If we have pending speculative tasks and only need a single executor, 
allocate one more
       // to satisfy the locality requirements of speculation
       maxNeeded + 1
     } else {
       maxNeeded
     }
+
+    if (unschedulableTaskSets > 0) {
+      // Request additional executors only if 
maxNeededWithSpeculationLocalityOffset is less than

Review comment:
       Can we simply this comment to be something like:
   
   Request additional executors to account for task sets that have tasks that 
are unschedulable due to blacklisting when the active executor count has 
already reached the the max needed we would normally get.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,28 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableTaskSetAdded(
+        unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): 
Unit = {
+      val stageId = unschedulableTaskSetAdded.stageId
+      val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+      allocationManager.synchronized {
+        val stageAttempt = StageAttempt(stageId, stageAttemptId)

Review comment:
       setting stageAttempt can be done outside of synchronized

##########
File path: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
##########
@@ -339,6 +349,20 @@ private[spark] trait SparkListenerInterface {
    */
   def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): 
Unit
 
+  /**
+   * Called when a taskset becomes unschedulable due to blacklisting and 
dynamic allocation
+   * is enabled
+   */
+  def onUnschedulableTaskSetAdded(
+      unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit
+
+  /**
+   * Called when an unschedulable taskset becomes schedulable and dynamic 
allocation
+   * is enabled

Review comment:
       add period

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -696,7 +717,8 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToNumSpeculativeTasks.isEmpty) {
+        if (stageAttemptToNumTasks.isEmpty &&
+          stageAttemptToNumSpeculativeTasks.isEmpty) {

Review comment:
       revert this change as its unneeded

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -311,6 +311,26 @@ private[spark] class DAGScheduler(
     eventProcessLoop.post(SpeculativeTaskSubmitted(task))
   }
 
+  /**
+   * Called by the TaskSetManager when a taskset becomes unschedulable due to 
blacklisting and
+   * dynamic allocation is enabled
+   */
+  def unschedulableTaskSetAdded(
+       stageId: Int,
+       stageAttemptId: Int): Unit = {
+    eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId))
+  }
+
+  /**
+   * Called by the TaskSetManager when an unschedulable taskset becomes 
schedulable and dynamic
+   * allocation is enabled

Review comment:
       nit add period

##########
File path: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
##########
@@ -339,6 +349,20 @@ private[spark] trait SparkListenerInterface {
    */
   def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): 
Unit
 
+  /**
+   * Called when a taskset becomes unschedulable due to blacklisting and 
dynamic allocation
+   * is enabled

Review comment:
       nit add period.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1014,6 +1034,20 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
   }
 
+  private[scheduler] def handleUnschedulableTaskSetAdded(
+      stageId: Int,
+      stageAttemptId: Int): Unit = {
+    listenerBus.post(
+      SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId))
+  }
+
+  private[scheduler] def handleUnschedulableTaskSetRemoved(
+    stageId: Int,
+    stageAttemptId: Int): Unit = {
+    listenerBus.post(
+      SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))

Review comment:
       same here put on previous line

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes 
the first unschedulable
+    // task found due to blacklisting. This way we only need to keep track of 
the unschedulable
+    // tasksets which is an indirect way to get the current number of 
unschedulable tasks.
+    def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+      val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, 
Set.empty).toSeq
+      attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+    }
+
+    def hasPendingUnschedulableTasks: Boolean = {
+      val attemptSets = resourceProfileIdToStageAttempt.values
+      attemptSets.exists { attempts =>
+        attempts.exists(unschedulableTaskSets.contains(_))
+      }
+    }
+
     def hasPendingTasks: Boolean = {
-      hasPendingSpeculativeTasks || hasPendingRegularTasks
+      hasPendingSpeculativeTasks || hasPendingRegularTasks || 
hasPendingUnschedulableTasks

Review comment:
       this doesn't make sense to me. Did you have a specific bug without this? 
 
   The tasks themselves are counted as regular tasks. 

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -696,7 +717,8 @@ private[spark] class ExecutorAllocationManager(
 

Review comment:
       I think we should remove the stage attempt from unschedulableTaskSets 
when the stage completes in case it fails or something and never send the 
unschedulabletasksetremoved

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,28 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableTaskSetAdded(
+        unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): 
Unit = {
+      val stageId = unschedulableTaskSetAdded.stageId
+      val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+      allocationManager.synchronized {
+        val stageAttempt = StageAttempt(stageId, stageAttemptId)
+        unschedulableTaskSets.add(stageAttempt)
+      }
+      allocationManager.onSchedulerBacklogged()

Review comment:
       this should be moved inside of synchronized

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -311,6 +311,26 @@ private[spark] class DAGScheduler(
     eventProcessLoop.post(SpeculativeTaskSubmitted(task))
   }
 
+  /**
+   * Called by the TaskSetManager when a taskset becomes unschedulable due to 
blacklisting and
+   * dynamic allocation is enabled

Review comment:
       nit, add period at end

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes 
the first unschedulable
+    // task found due to blacklisting. This way we only need to keep track of 
the unschedulable
+    // tasksets which is an indirect way to get the current number of 
unschedulable tasks.
+    def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+      val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, 
Set.empty).toSeq
+      attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+    }
+
+    def hasPendingUnschedulableTasks: Boolean = {
+      val attemptSets = resourceProfileIdToStageAttempt.values
+      attemptSets.exists { attempts =>
+        attempts.exists(unschedulableTaskSets.contains(_))
+      }
+    }
+
     def hasPendingTasks: Boolean = {
-      hasPendingSpeculativeTasks || hasPendingRegularTasks
+      hasPendingSpeculativeTasks || hasPendingRegularTasks || 
hasPendingUnschedulableTasks
     }
 
     def totalPendingTasksPerResourceProfile(rp: Int): Int = {
-      pendingTasksPerResourceProfile(rp) + 
pendingSpeculativeTasksPerResourceProfile(rp)
+      pendingTasksPerResourceProfile(rp) +
+        pendingSpeculativeTasksPerResourceProfile(rp)

Review comment:
       revert

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes 
the first unschedulable
+    // task found due to blacklisting. This way we only need to keep track of 
the unschedulable
+    // tasksets which is an indirect way to get the current number of 
unschedulable tasks.

Review comment:
       so we should clarify further. I realize its indirect way, but its not 
really the number of unschedulable tasks.  Perhaps we can just this to say 
something a bit more generic like:
   
   Currently we only know when a task set has an unschedulable task, we don't 
know the exact number and since the allocation manager isn't tied closely with 
the scheduler we use the number of tasks sets that are unschedulable as a 
heuristic to add more executors.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1014,6 +1034,20 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
   }
 
+  private[scheduler] def handleUnschedulableTaskSetAdded(
+      stageId: Int,
+      stageAttemptId: Int): Unit = {
+    listenerBus.post(
+      SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId))

Review comment:
       I think this will fit on previous line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to