tgravescs commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r451820344



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -652,18 +652,26 @@ private[spark] class TaskSchedulerImpl(
                 case Some ((executorId, _)) =>

Review comment:
       I think the above comment needs to be updated

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager(
   private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
     val pending = listener.totalPendingTasksPerResourceProfile(rpId)
     val pendingSpeculative = 
listener.pendingSpeculativeTasksPerResourceProfile(rpId)
+    val numUnschedulables = 
listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)

Review comment:
       we should clarify that this is unschedulable task sets, not 
unschedulable number of tasks.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -311,6 +311,16 @@ private[spark] class DAGScheduler(
     eventProcessLoop.post(SpeculativeTaskSubmitted(task))
   }
 
+  /**
+   * Called by the TaskSetManager when there is an unschedulable blacklist 
task and dynamic
+   * allocation is enabled
+   */
+  def unschedulableBlacklistTaskSubmitted(

Review comment:
       I think we should rename.   Maybe just remove Submitted.  since this 
isn't really Task Submission

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +810,23 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableTaskSet
+      (unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = {

Review comment:
       nit, fix indentation and style. "(" should be on previous line, 
parameters indented 4 spaces

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +867,28 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes 
the first
+    // unschedulable task due to blacklisting. So keeping track of 
unschedulableTaskSets
+    // should be enough as we'll always have no more than a task unschedulable 
at any time.

Review comment:
       I think this comment is confusing, specifically "as we'll always have no 
more than a task unschedulable at any time".
   That isn't true. you could have lots of tasks blacklisted and thus 
unschedulable at any time. That function only returns the first one it finds.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,23 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableBlacklistTaskSubmitted
+      (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted): 
Unit = {
+      val stageId = blacklistedTask.stageId
+      val stageAttemptId = blacklistedTask.stageAttemptId
+      allocationManager.synchronized {
+        (stageId, stageAttemptId) match {
+          case (Some(stageId), Some(stageAttemptId)) =>
+            val stageAttempt = StageAttempt(stageId, stageAttemptId)
+            unschedulableTaskSets.add(stageAttempt)
+          case (None, _) =>
+            // Clear unschedulableTaskSets since atleast one task becomes 
schedulable now
+            unschedulableTaskSets.clear()

Review comment:
       Yeah I could go either way on this as well. if we only remove the task 
set it could help get more faster if lots of blacklisted or you only requested 
a few to begin with. I think its a big out of the norm for us to rely on the 
parameters here being None as well, traditionally we would have 2 messages, 1 
for it being added, one for it being removed.  perhaps 
onUnschedulableTaskSetAdded and onUnschedulableTaskSetRemoved.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -289,13 +290,26 @@ private[spark] class ExecutorAllocationManager(
       s" tasksperexecutor: $tasksPerExecutor")
     val maxNeeded = math.ceil(numRunningOrPendingTasks * 
executorAllocationRatio /
       tasksPerExecutor).toInt
-    if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+    val maxNeededWithSpeculationLocalityOffset =
+      if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
       // If we have pending speculative tasks and only need a single executor, 
allocate one more
       // to satisfy the locality requirements of speculation
       maxNeeded + 1
     } else {
       maxNeeded
     }
+
+    // Since the maxNeededWithSpeculationLocalityOffset already includes the 
num executors needed
+    // to run unschedulable tasks, we would only try to add when the
+    // maxNeededWithSpeculationLocalityOffset is lesser than the active 
executors

Review comment:
       this comment is also confusing to me.  its also above a check for 
numUnschedulables, which has nothing to with the comment.  Can we move it down 
next to the check on line 308 and clarify. I think you mean we don't add any 
tasks for the unschedulables if active executors is less than the number we are 
already asking for.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to