jaceklaskowski commented on code in PR #40730:
URL: https://github.com/apache/spark/pull/40730#discussion_r1172914115


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -401,17 +403,24 @@ private[spark] class TaskSchedulerImpl(
       val host = shuffledOffers(i).host
       val taskSetRpID = taskSet.taskSet.resourceProfileId
 
+      var continueScheduling = true
       // check whether the task can be scheduled to the executor base on 
resource profile.
-      if (sc.resourceProfileManager
-        .canBeScheduled(taskSetRpID, shuffledOffers(i).resourceProfileId)) {
+      while (sc.resourceProfileManager
+        .canBeScheduled(taskSetRpID, shuffledOffers(i).resourceProfileId) && 
continueScheduling) {
         val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, 
availableCpus(i),
           availableResources(i))
+        // if no task meet requirement, stop scheduling
+        continueScheduling &= taskResAssignmentsOpt.nonEmpty
         taskResAssignmentsOpt.foreach { taskResAssignments =>
           try {
             val prof = 
sc.resourceProfileManager.resourceProfileFromId(taskSetRpID)
             val taskCpus = 
ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
             val (taskDescOption, didReject, index) =
               taskSet.resourceOffer(execId, host, maxLocality, taskCpus, 
taskResAssignments)
+            // if no task meet resource offer, stop scheduling

Review Comment:
   s/meet/meets the



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -118,6 +118,8 @@ private[spark] class TaskSchedulerImpl(
   // CPUs to request per task
   val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
 
+  val useBinPack = conf.get(BIN_PACK_ENABLED)

Review Comment:
   nit: I thought the convention is for predicates to be like `isBinPackEnabled`



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,14 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val BIN_PACK_ENABLED =
+    ConfigBuilder("spark.scheduler.binPack.enabled")
+      .doc(s"Whether to enable bin packing task scheduling on executors. This 
could help save" +

Review Comment:
   nit: Enables bin packing task scheduling...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to