tgravescs commented on a change in pull request #27773: [SPARK-29154][CORE] 
Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389973937
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at 
least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise 
returns
+   * the task resource assignments to give to the next task. Note that the 
assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, 
Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, 
resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    // remove task cpus since we checked already
+    val tsResources = 
taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+    // we go through all resources here so that we can make sure they match 
and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          val workerAvail = 
availWorkerResources.get(rName).map(_.size).getOrElse(0)
+          if (workerAvail >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to 
calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
 
 Review comment:
   we could. the code would become a  loop (adding up as you went) and not like 
a map.sum().  honestly I just did what was there before and was thinking that 
barrier doesn't support dynamic allocation at this point (unless it got checked 
in without me seeing it?) so the number of executors is more likely to be less 
then what you need, not more.  
   Do you see other cases?
   I guess  when barrier scheduling starts to support dynamic allocation this 
could be more of an issue if you run a large ETL stage and then to smaller 
number with ML. 
   I'm fine either way, if you think its an issue now I will change or we could 
wait til dynamic allocation is supported, what are you thoughts?  (note that I 
am changing that logic as it had a bug that Mridul found in his review) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to