tgravescs commented on a change in pull request #29395:
URL: https://github.com/apache/spark/pull/29395#discussion_r467935481



##########
File path: 
core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
##########
@@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
     testSubmitJob(sc, rdd,
       message = 
ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
   }
+
+  test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks 
should " +
+    "consider all kinds of resources for the barrier stage") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""")
+
+      val conf = new SparkConf()
+        .setMaster("local-cluster[1, 2, 1024]")
+        .setAppName("test-cluster")
+        .set(WORKER_GPU_ID.amountConf, "1")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "1")
+        .set(TASK_GPU_ID.amountConf, "1")
+        // disable barrier stage retry to fail the application as soon as 
possible
+        .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
+        // disable the check to simulate the behavior of Standalone in order to
+        // reproduce the issue.
+        .set(Tests.SKIP_VALIDATE_CORES_TESTING, true)
+      sc = new SparkContext(conf)
+      // setup an executor which will have 2 CPUs and 1 GPU
+      TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+
+      val exception = intercept[BarrierJobSlotsNumberCheckFailed] {
+        sc.parallelize(Range(1, 10), 2)
+          .barrier()
+          .mapPartitions { iter => iter }
+          .collect()
+      }
+      assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " 
+
+        "mode does not allow run a barrier stage that requires more slots"))

Review comment:
       I'm not sure if its worth it but it would be nice to perhaps print what 
the limiting resource is. If its to much change or work to track we may just 
skip it.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -933,6 +941,30 @@ private[spark] object TaskSchedulerImpl {
 
   val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key
 
+  /**
+   * Calculate the max available task slots given the `availableCpus` and 
`availableResources`
+   * from a collection of executors.
+   *
+   * @param scheduler the TaskSchedulerImpl instance
+   * @param availableCpus an Array of the amount of available cpus from the 
executors.
+   * @param availableResources an Array of the resources map from the 
executors. In the resource
+   *                           map, it maps from the resource name to its 
amount.
+   * @return the number of max task slots
+   */
+  def calculateSlots(

Review comment:
       nit: name calculateAvailableSlots to be consistent with master

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -446,7 +446,15 @@ private[spark] class TaskSchedulerImpl(
     // of locality levels so that it gets a chance to launch local tasks on 
all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
     for (taskSet <- sortedTaskSets) {
-      val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
+      val availableSlots = if (taskSet.isBarrier) {

Review comment:
       it would be nice to add a comment (like master branch) that this is only 
used with barrier scheduling, availableSlots not used otherwise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to