tgravescs commented on a change in pull request #24406: [SPARK-27024] Executor 
interface for cluster managers to support GPU and other resources
URL: https://github.com/apache/spark/pull/24406#discussion_r281332067
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ##########
 @@ -71,6 +82,161 @@ private[spark] class CoarseGrainedExecutorBackend(
     }(ThreadUtils.sameThread)
   }
 
+  // Check that the executor resources at startup will satisfy the user 
specified task
+  // requirements (spark.taks.resource.*) and that they match the executor 
configs
+  // specified by the user (spark.executor.resource.*) to catch mismatches 
between what
+  // the user requested and what resource manager gave or what the discovery 
script found.
+  private def checkExecResourcesMeetTaskRequirements(
+      taskResourceConfigs: Array[(String, String)],
+      actualExecResources: Map[String, ResourceInformation]): Unit = {
+
+    // get just the of resource name to count
+    val taskResourcesAndCounts = taskResourceConfigs.
+      withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
+      map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)}
+
+    case class ResourceRealCounts(execCount: Long, taskCount: Long)
+
+    // SPARK will only base it off the counts and known byte units, if
+    // user is trying to use something else we will have to add a plugin later
+    taskResourcesAndCounts.foreach { case (rName, taskCount) =>
+      if (actualExecResources.contains(rName)) {
+        val execResourceInfo = actualExecResources(rName)
+        val taskUnits = env.conf.getOption(
+          SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_UNITS_POSTFIX)
+        val userExecUnitsConfigName =
+          SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_UNITS_POSTFIX
+        val userExecConfigUnits = env.conf.getOption(userExecUnitsConfigName)
+        val realCounts = if (execResourceInfo.units.nonEmpty) {
+          if (taskUnits.nonEmpty && taskUnits.get.nonEmpty) {
+            if (userExecConfigUnits.isEmpty || 
userExecConfigUnits.get.isEmpty) {
+              throw new SparkException(s"Resource: $rName has units in task 
config " +
+                s"and executor startup config but the user specified executor 
resource " +
+                s"config is missing the units config - see 
${userExecUnitsConfigName}.")
+            }
+            try {
+              val execCountWithUnits =
+                Utils.byteStringAsBytes(execResourceInfo.count.toString + 
execResourceInfo.units)
+              val taskCountWithUnits = Utils.byteStringAsBytes(taskCount + 
taskUnits.get)
+              ResourceRealCounts(execCountWithUnits, taskCountWithUnits)
+            } catch {
+              case e: NumberFormatException =>
+                // Ignore units not of byte types and just use count
+                logWarning(s"Illegal resource unit type, spark only " +
+                  s"supports conversion of byte types, units: 
$execResourceInfo.units, " +
+                  s"ignoring the type and using the raw count.", e)
+                ResourceRealCounts(execResourceInfo.count, taskCount.toLong)
+            }
+          } else {
+            throw new SparkException(
+              s"Resource: $rName has an executor units config: 
${execResourceInfo.units}, but " +
+                s"the task units config is missing.")
+          }
+        } else {
+          if (taskUnits.nonEmpty && taskUnits.get.nonEmpty) {
+            throw new SparkException(
+              s"Resource: $rName has a task units config: ${taskUnits.get}, 
but the executor " +
+              s"units config is missing.")
+          }
+          ResourceRealCounts(execResourceInfo.count, taskCount.toLong)
+        }
+        if (realCounts.execCount < realCounts.taskCount) {
+          throw new SparkException(s"Executor resource: $rName, count: 
${realCounts.execCount} " +
+            s"isn't large enough to meet task requirements of: 
${realCounts.taskCount}")
+        }
+        // also make sure the executor resource count on start matches the
+        // spark.executor.resource configs specified by user
+        val userExecCountConfigName =
+          SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_POSTFIX
+        val userExecConfigCount = env.conf.getOption(userExecCountConfigName).
+          getOrElse(throw new SparkException(s"Executor resource: $rName not 
specified " +
+            s"via config: $userExecCountConfigName, but required " +
+            s"by the task, please fix your configuration"))
+        val execConfigCountWithUnits = if (userExecConfigUnits.nonEmpty) {
+          val count = try {
 
 Review comment:
   Sure

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to