Ngone51 commented on a change in pull request #23377: [SPARK-26439][CORE][WIP] 
Introduce WorkerOffer reservation mechanism for Barrier TaskSet
URL: https://github.com/apache/spark/pull/23377#discussion_r244903195
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -382,100 +475,97 @@ private[spark] class TaskSchedulerImpl(
       }
     }.getOrElse(offers)
 
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue
     val shuffledOffers = shuffleOffers(filteredOffers)
+    val availableCpus = excludeReservedCpus(shuffledOffers, sortedTaskSets)
+    val execIdToOfferIndex =
+      shuffledOffers.zipWithIndex.map { case (o, i) => (o.executorId, i)}.toMap
     // Build a list of tasks to assign to each worker.
     val tasks = shuffledOffers.map(o => new 
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
-    val availableCpus = shuffledOffers.map(o => o.cores).toArray
-    val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
-    val sortedTaskSets = rootPool.getSortedTaskSetQueue
     for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
         taskSet.parent.name, taskSet.name, taskSet.runningTasks))
       if (newExecAvail) {
         taskSet.executorAdded()
       }
     }
-
+    var launchedAnyBarrierTaskSet = false
     // Take each TaskSet in our scheduling order, and then offer it each node 
in increasing order
     // of locality levels so that it gets a chance to launch local tasks on 
all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
     for (taskSet <- sortedTaskSets) {
-      // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
-      if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
-        // Skip the launch process.
-        // TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
-        // slots), fail the job on submit.
-        logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
-          s"because the barrier taskSet requires ${taskSet.numTasks} slots, 
while the total " +
-          s"number of available slots is $availableSlots.")
 
 Review comment:
   As we're introducing reservation mechanism, I'm considering this msg may be 
no longer useful. And I added another msg for user once barrier taskset abort 
due to insufficient resource. You can see line 614-616.
   
   ```
   ts.abort(
             s"Barrier TaskSet ${ts.taskSet.id} abort due to " +
             s"insufficient resource after waiting $waitTime min.")
   ```
   
   I can revert this if you think it is still needed, not a big problem.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to