tgravescs commented on a change in pull request #24374: [SPARK-27366][CORE]
Support GPU Resources in Spark job scheduling
URL: https://github.com/apache/spark/pull/24374#discussion_r289839256
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2707,27 +2707,67 @@ object SparkContext extends Logging {
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
- // SPARK-26340: Ensure that executor's core num meets at least one task
requirement.
- def checkCpusPerTask(
- clusterMode: Boolean,
- maxCoresPerExecutor: Option[Int]): Unit = {
- val cpusPerTask = sc.conf.get(CPUS_PER_TASK)
- if (clusterMode && sc.conf.contains(EXECUTOR_CORES)) {
- if (sc.conf.get(EXECUTOR_CORES) < cpusPerTask) {
- throw new SparkException(s"${CPUS_PER_TASK.key}" +
- s" must be <= ${EXECUTOR_CORES.key} when run on $master.")
+ // Ensure that executor's resources satisfies one or more tasks
requirement.
+ def checkResourcesPerTask(clusterMode: Boolean, executorCores:
Option[Int]): Unit = {
+ val taskCores = sc.conf.get(CPUS_PER_TASK)
+ val execCores = if (clusterMode) {
+ executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES))
+ } else {
+ executorCores.get
+ }
+
+ // Number of cores per executor must meet at least one task requirement.
+ if (execCores < taskCores) {
+ throw new SparkException(s"The number of cores per executor
(=$execCores) has to be >= " +
+ s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on
$master.")
+ }
+
+ // Calculate the max slots each executor can provide based on resources
available on each
+ // executor and resources required by each task.
+ val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
+ val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
+ SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_COUNT_SUFFIX).toMap
+ val numSlots = (taskResourcesAndCount.map { case (rName, taskCount) =>
+ // Make sure the executor resources were specified through config.
+ val execCount = executorResourcesAndCounts.getOrElse(rName,
+ throw new SparkException(
+ s"The executor resource config: " +
+ s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} " +
+ "needs to be specified since a task requirement config: " +
+ s"${SPARK_TASK_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} was specified")
+ )
+ // Make sure the executor resources are large enough to launch at
least one task.
+ if (execCount.toLong < taskCount.toLong) {
+ throw new SparkException(
+ s"The executor resource config: " +
+ s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} " +
+ s"= $execCount has to be >= the task config: " +
+ s"${SPARK_TASK_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} = $taskCount")
}
- } else if (maxCoresPerExecutor.isDefined) {
- if (maxCoresPerExecutor.get < cpusPerTask) {
- throw new SparkException(s"Only ${maxCoresPerExecutor.get} cores
available per executor" +
- s" when run on $master, and ${CPUS_PER_TASK.key} must be <= it.")
+ execCount.toInt / taskCount
+ }.toList ++ Seq(execCores / taskCores)).min
+ // There have been checks inside SparkConf to make sure the executor
resources were specified
+ // and are large enough if any task resources were specified.
+ taskResourcesAndCount.foreach { case (rName, taskCount) =>
+ val execCount = executorResourcesAndCounts(rName)
+ if (execCount.toInt / taskCount.toInt != numSlots) {
+ val message = s"The value of executor resource config: " +
+ s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} " +
+ s"= $execCount is more than that tasks can take: $numSlots * " +
+ s"${SPARK_TASK_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_COUNT_SUFFIX} = $taskCount. " +
+ s"The resources may be wasted."
+ if (Utils.isTesting) {
+ throw new SparkException(message)
+ } else {
+ logWarning(message)
Review comment:
originally we talked about throwing here to not allow it, just want to make
sure we intentionally changed our mind here? I'm really ok either way we go as
there were some people questioning this on the Spip
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]