mridulm commented on a change in pull request #27773: [SPARK-29154][CORE]
Update Spark scheduler for stage level scheduling
URL: https://github.com/apache/spark/pull/27773#discussion_r389543998
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -381,12 +388,85 @@ private[spark] class TaskSchedulerImpl(
/**
* Check whether the resources from the WorkerOffer are enough to run at
least one task.
+ * Returns None if the resources don't meet the task requirements, otherwise
returns
+ * the task resource assignments to give to the next task. Note that the
assignments maybe
+ * be empty if no custom resources are used.
*/
- private def resourcesMeetTaskRequirements(resources: Map[String,
Buffer[String]]): Boolean = {
- val resourcesFree = resources.map(r => r._1 -> r._2.length)
- val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree,
resourcesReqsPerTask)
- logDebug(s"Resources meet task requirements is: $meetsReqs")
- meetsReqs
+ private def resourcesMeetTaskRequirements(
+ taskSet: TaskSetManager,
+ availCpus: Int,
+ availWorkerResources: Map[String, Buffer[String]]
+ ): Option[Map[String, ResourceInformation]] = {
+ val rpId = taskSet.taskSet.resourceProfileId
+ val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+ // check if the ResourceProfile has cpus first since that is common case
+ if (availCpus < taskCpus) return None
+
+ val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+ // remove task cpus since we checked already
+ val tsResources =
taskSetProf.taskResources.filterKeys(!_.equals(ResourceProfile.CPUS))
+ val localTaskReqAssign = HashMap[String, ResourceInformation]()
+ if (tsResources.isEmpty) return Some(localTaskReqAssign.toMap)
+ // we go through all resources here so that we can make sure they match
and also get what the
+ // assignments are for the next task
+ for ((rName, taskReqs) <- tsResources) {
+ val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+ availWorkerResources.get(rName) match {
+ case Some(workerRes) =>
+ val workerAvail =
availWorkerResources.get(rName).map(_.size).getOrElse(0)
+ if (workerAvail >= taskAmount) {
+ localTaskReqAssign.put(rName, new ResourceInformation(rName,
+ workerRes.take(taskAmount).toArray))
+ } else {
+ return None
+ }
+ case None => return None
+ }
+ }
+ Some(localTaskReqAssign.toMap)
+ }
+
+ // Use the resource that the resourceProfile has as the limiting resource to
calculate the
+ // total number of slots available based on the current offers.
+ private def calculateAvailableSlots(
+ resourceProfileIds: Array[Int],
+ availableCpus: Array[Int],
+ availableResources: Array[Map[String, Buffer[String]]],
+ rpId: Int): Int = {
+ val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+ val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter {
case (id, _) =>
+ (id == resourceProfile.id)
+ }
+ val coresKnown = resourceProfile.isCoresLimitKnown
+ var limitingResource = resourceProfile.limitingResource(sc.getConf)
+ val taskCpus = sc.resourceProfileManager.taskCpusForProfileId(rpId)
+
+ offersForResourceProfile.map { case (o, index) =>
+ val numTasksPerExecCores = availableCpus(index) / taskCpus
+ // when executor cores config isn't set, we can't calculate the real
limiting resource
+ // and number of tasks per executor ahead of time, so calculate it now.
+ if (!coresKnown) {
+ val numTasksPerExecCustomResource =
resourceProfile.maxTasksPerExecutor(sc.getConf)
+ if (limitingResource.isEmpty ||
+ (limitingResource.nonEmpty && numTasksPerExecCores <
numTasksPerExecCustomResource)) {
+ limitingResource = ResourceProfile.CPUS
Review comment:
I am a bit unclear on this.
Even if number of tasks based on a custom resource is > number of tasks
based on available cpu cores, the actual number of tasks which can be assigned
based on that custom resource could be < number of tasks by core (3 of the 4
gpu's already in use for example while available cores == 2).
Here, we would incorrectly assume the executor can schedule upto
numTasksPerExecCores tasks while there might not be enough available custom
resource right ? (the else block below does the right thing - I am not sure of
this short circuiting being done here).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]