bmarcott commented on a change in pull request #26696: [WIP][SPARK-18886][CORE] 
Make locality wait time be the time since a TSM's available slots were fully 
utilized
URL: https://github.com/apache/spark/pull/26696#discussion_r360474081
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/Pool.scala
 ##########
 @@ -119,4 +120,72 @@ private[spark] class Pool(
       parent.decreaseRunningTasks(taskNum)
     }
   }
+
+  // Update the number of slots considered available for each TaskSetManager 
whose ancestor
+  // in the tree is this pool
+  // For FAIR scheduling, slots are distributed among pools based on weights 
and minshare.
+  //   If a pool requires fewer slots than are available to it, the leftover 
slots are redistributed
+  //   to the remaining pools using the remaining pools' weights.
+  // For FIFO scheduling, the schedulable queue is iterated over in FIFO order,
+  //   giving each schedulable the remaining slots,
+  //   up to the number of remaining tasks for that schedulable.
+  override def updateAvailableSlots(numSlots: Float): Unit = {
+    schedulingMode match {
+      case SchedulingMode.FAIR =>
+        val queueCopy = new util.LinkedList[Schedulable](schedulableQueue)
+        var shouldRedistribute = true
+        var totalWeights = schedulableQueue.asScala.map(_.weight).sum
+        var totalSlots = numSlots
+        while (totalSlots > 0 && shouldRedistribute) {
+          shouldRedistribute = false
+          var nextWeights = totalWeights
+          var nextSlots = totalSlots
+          val iterator = queueCopy.iterator()
+          while (iterator.hasNext) {
+            val schedulable = iterator.next()
+            val numTasksRemaining = schedulable.getSortedTaskSetQueue
+              .map(tsm => tsm.tasks.length - tsm.tasksSuccessful).sum
+            val allocatedSlots = Math.max(
+              totalSlots * schedulable.weight / totalWeights,
+              schedulable.minShare)
+            if (numTasksRemaining < allocatedSlots) {
 
 Review comment:
   You got me thinking more about this by using the word "exactly". It is the 
combination of TaskSchedulerImpl, Pool (including FAIR/FIFO scheduling algos), 
and TaskManager (including delay scheduling), etc. which determine how 
resources are assigned. 
   The goal for this approach is to simulate scheduling without delay 
scheduling.
   This helps determine how much you are underutilizing resources due to delay 
scheduling.
   
   So far the most recent diff seems to fall short due to at least a couple 
reasons:
   1.  Scheduling is different depending on if 
`TaskSchedulerImpl.resourceOffers` is called one by one with single offers vs 
if it is called with all offers in one batch. 
`Schedulable.getSortedTaskSetQueue` is called only once per `resourceOffers` 
call, meaning that for a batch call, it only follows the scheduling algorithm 
for the first task that is scheduled (seems like a bug). 
   2. The approach doesn't exactly follow FAIR ordering, such as the 
minShareRatio and schedulable name based ordering found in 
`FairSchedulingAlgorithm.`
   
   I have a rough idea for an alternative implementation which does a more 
direct simulation, utilizing the `SchedulingAlgorithm` trait directly. I'll do 
more thinking in the coming days.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to