tgravescs commented on a change in pull request #29332:
URL: https://github.com/apache/spark/pull/29332#discussion_r464427498



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -633,9 +633,16 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
-    val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
-    val executorsWithResourceProfile = 
executorDataMap.values.filter(_.resourceProfileId == rp.id)
-    executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum
+    val (rpIds, cpus, resources) = executorDataMap.values.toArray.map { 
executor =>

Review comment:
       I think this should first filter executorDataMap for active executors.
   
   We are doing a lot of calculation here even when not in a barrier stage. I 
think out in DagScheduler.checkBarrierStageWithNumSlots we should move the call 
to this after check for rdd.isBarrier (or make it las) so everyone doesn't have 
to pay for it.  This is definitely doing more work than it was before and we 
want to keep scheduler code as fast as possible.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -468,51 +468,6 @@ private[spark] class TaskSchedulerImpl(
     Some(localTaskReqAssign.toMap)
   }
 
-  // Use the resource that the resourceProfile has as the limiting resource to 
calculate the
-  // total number of slots available based on the current offers.
-  private def calculateAvailableSlots(
-      resourceProfileIds: Array[Int],
-      availableCpus: Array[Int],
-      availableResources: Array[Map[String, Buffer[String]]],
-      taskSet: TaskSetManager): Int = {
-    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(
-      taskSet.taskSet.resourceProfileId)
-    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { 
case (id, _) =>
-      (id == resourceProfile.id)
-    }
-    val coresKnown = resourceProfile.isCoresLimitKnown
-    var limitingResource = resourceProfile.limitingResource(conf)
-    val taskCpus = 
ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf)
-
-    offersForResourceProfile.map { case (o, index) =>
-      val numTasksPerExecCores = availableCpus(index) / taskCpus
-      // if limiting resource is empty then we have no other resources, so it 
has to be CPU
-      if (limitingResource == ResourceProfile.CPUS || 
limitingResource.isEmpty) {
-        numTasksPerExecCores
-      } else {
-        val taskLimit = 
resourceProfile.taskResources.get(limitingResource).map(_.amount)
-          .getOrElse {
-            val errorMsg = "limitingResource returns from ResourceProfile " +
-              s"$resourceProfile doesn't actually contain that task resource!"

Review comment:
       correct, I believe it was there just a double check and make sure 
nothing broke in the future

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -633,9 +633,16 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
-    val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
-    val executorsWithResourceProfile = 
executorDataMap.values.filter(_.resourceProfileId == rp.id)
-    executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum
+    val (rpIds, cpus, resources) = executorDataMap.values.toArray.map { 
executor =>
+      (
+        executor.resourceProfileId,
+        executor.totalCores,
+        executor.resourcesInfo.map { case (name, rInfo) =>
+          (name, rInfo.resourceAddresses.length * rInfo.slotsPerAddress)

Review comment:
       To be clear, here we are trying to calculate the total concurrent tasks 
that could run, even if some of them are busy at the moment so I  think we 
should add some documentation there just to clarify.
   Also can we add in a function to ExecutorResourceInfo to get this 
(rInfo.resourceAddresses.length * rInfo.slotsPerAddress).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to