tgravescs commented on a change in pull request #29395:
URL: https://github.com/apache/spark/pull/29395#discussion_r467935481
##########
File path:
core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
##########
@@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite
with LocalSparkContext
testSubmitJob(sc, rdd,
message =
ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}
+
+ test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks
should " +
+ "consider all kinds of resources for the barrier stage") {
+ withTempDir { dir =>
+ val discoveryScript = createTempScriptWithExpectedOutput(
+ dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""")
+
+ val conf = new SparkConf()
+ .setMaster("local-cluster[1, 2, 1024]")
+ .setAppName("test-cluster")
+ .set(WORKER_GPU_ID.amountConf, "1")
+ .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+ .set(EXECUTOR_GPU_ID.amountConf, "1")
+ .set(TASK_GPU_ID.amountConf, "1")
+ // disable barrier stage retry to fail the application as soon as
possible
+ .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
+ // disable the check to simulate the behavior of Standalone in order to
+ // reproduce the issue.
+ .set(Tests.SKIP_VALIDATE_CORES_TESTING, true)
+ sc = new SparkContext(conf)
+ // setup an executor which will have 2 CPUs and 1 GPU
+ TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+
+ val exception = intercept[BarrierJobSlotsNumberCheckFailed] {
+ sc.parallelize(Range(1, 10), 2)
+ .barrier()
+ .mapPartitions { iter => iter }
+ .collect()
+ }
+ assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution "
+
+ "mode does not allow run a barrier stage that requires more slots"))
Review comment:
I'm not sure if its worth it but it would be nice to perhaps print what
the limiting resource is. If its to much change or work to track we may just
skip it.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -933,6 +941,30 @@ private[spark] object TaskSchedulerImpl {
val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key
+ /**
+ * Calculate the max available task slots given the `availableCpus` and
`availableResources`
+ * from a collection of executors.
+ *
+ * @param scheduler the TaskSchedulerImpl instance
+ * @param availableCpus an Array of the amount of available cpus from the
executors.
+ * @param availableResources an Array of the resources map from the
executors. In the resource
+ * map, it maps from the resource name to its
amount.
+ * @return the number of max task slots
+ */
+ def calculateSlots(
Review comment:
nit: name calculateAvailableSlots to be consistent with master
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -446,7 +446,15 @@ private[spark] class TaskSchedulerImpl(
// of locality levels so that it gets a chance to launch local tasks on
all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF,
RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
- val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
+ val availableSlots = if (taskSet.isBarrier) {
Review comment:
it would be nice to add a comment (like master branch) that this is only
used with barrier scheduling, availableSlots not used otherwise.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]