tgravescs commented on a change in pull request #24374: [SPARK-27366][CORE] 
Support GPU Resources in Spark job scheduling
URL: https://github.com/apache/spark/pull/24374#discussion_r289424074
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##########
 @@ -2712,27 +2712,43 @@ object SparkContext extends Logging {
     // When running locally, don't try to re-execute tasks on failure.
     val MAX_LOCAL_TASK_FAILURES = 1
 
-    // SPARK-26340: Ensure that executor's core num meets at least one task 
requirement.
-    def checkCpusPerTask(
-      clusterMode: Boolean,
-      maxCoresPerExecutor: Option[Int]): Unit = {
-      val cpusPerTask = sc.conf.get(CPUS_PER_TASK)
-      if (clusterMode && sc.conf.contains(EXECUTOR_CORES)) {
-        if (sc.conf.get(EXECUTOR_CORES) < cpusPerTask) {
-          throw new SparkException(s"${CPUS_PER_TASK.key}" +
-            s" must be <= ${EXECUTOR_CORES.key} when run on $master.")
-        }
-      } else if (maxCoresPerExecutor.isDefined) {
-        if (maxCoresPerExecutor.get < cpusPerTask) {
-          throw new SparkException(s"Only ${maxCoresPerExecutor.get} cores 
available per executor" +
-            s" when run on $master, and ${CPUS_PER_TASK.key} must be <= it.")
+    // Ensure that executor's resources satisfies one or more tasks 
requirement.
+    def checkResourcesPerTask(clusterMode: Boolean, executorCores: 
Option[Int]): Unit = {
+      val taskCores = sc.conf.get(CPUS_PER_TASK)
+      val execCores = if (clusterMode) {
+        executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES))
+      } else {
+        executorCores.get
+      }
+
+      // Number of cores per executor must meet at least one task requirement.
+      if (execCores < taskCores) {
+        throw new SparkException(s"The number of cores per executor 
(=$execCores) has to be >= " +
+          s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on 
$master.")
+      }
+
+      // Resources per executor must be the same multiple numbers of the task 
requirements.
+      val numSlots = execCores / taskCores
+      val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
+      val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
+        SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_COUNT_SUFFIX).toMap
+      // There have been checks inside SparkConf to make sure the executor 
resources were specified
 
 Review comment:
   I actually don't think we need the other check in SparkConf with this one, 
since that check is making sure there is an executor config (which you can add 
below, see my other comment) and that executor config >= task config, but the 
>= should also be covered by your numSlots check.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to