tgravescs commented on a change in pull request #24374: [SPARK-27366][CORE]
Support GPU Resources in Spark job scheduling
URL: https://github.com/apache/spark/pull/24374#discussion_r289421694
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2712,27 +2712,43 @@ object SparkContext extends Logging {
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
- // SPARK-26340: Ensure that executor's core num meets at least one task
requirement.
- def checkCpusPerTask(
- clusterMode: Boolean,
- maxCoresPerExecutor: Option[Int]): Unit = {
- val cpusPerTask = sc.conf.get(CPUS_PER_TASK)
- if (clusterMode && sc.conf.contains(EXECUTOR_CORES)) {
- if (sc.conf.get(EXECUTOR_CORES) < cpusPerTask) {
- throw new SparkException(s"${CPUS_PER_TASK.key}" +
- s" must be <= ${EXECUTOR_CORES.key} when run on $master.")
- }
- } else if (maxCoresPerExecutor.isDefined) {
- if (maxCoresPerExecutor.get < cpusPerTask) {
- throw new SparkException(s"Only ${maxCoresPerExecutor.get} cores
available per executor" +
- s" when run on $master, and ${CPUS_PER_TASK.key} must be <= it.")
+ // Ensure that executor's resources satisfies one or more tasks
requirement.
+ def checkResourcesPerTask(clusterMode: Boolean, executorCores:
Option[Int]): Unit = {
+ val taskCores = sc.conf.get(CPUS_PER_TASK)
+ val execCores = if (clusterMode) {
+ executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES))
+ } else {
+ executorCores.get
+ }
+
+ // Number of cores per executor must meet at least one task requirement.
+ if (execCores < taskCores) {
+ throw new SparkException(s"The number of cores per executor
(=$execCores) has to be >= " +
+ s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on
$master.")
+ }
+
+ // Resources per executor must be the same multiple numbers of the task
requirements.
+ val numSlots = execCores / taskCores
+ val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
+ val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
+ SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_COUNT_SUFFIX).toMap
+ // There have been checks inside SparkConf to make sure the executor
resources were specified
+ // and are large enough if any task resources were specified.
+ taskResourcesAndCount.foreach { case (rName, taskCount) =>
+ val execCount = executorResourcesAndCounts(rName)
+ if (execCount.toInt / taskCount.toInt != numSlots) {
+ throw new SparkException(
+ s"The value of executor resource config: " +
+ s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} " +
+ s"= $execCount has to be $numSlots times the number of the task
config: " +
+ s"${SPARK_TASK_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} = $taskCount")
Review comment:
we perhaps want to expand this to say something like - so we don't waste
resources
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]