tgravescs commented on a change in pull request #24406: [SPARK-27024] Executor 
interface for cluster managers to support GPU and other resources
URL: https://github.com/apache/spark/pull/24406#discussion_r283028101
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ##########
 @@ -71,6 +82,99 @@ private[spark] class CoarseGrainedExecutorBackend(
     }(ThreadUtils.sameThread)
   }
 
+  // Check that the executor resources at startup will satisfy the user 
specified task
+  // requirements (spark.task.resource.*) and that they match the executor 
configs
+  // specified by the user (spark.executor.resource.*) to catch mismatches 
between what
+  // the user requested and what resource manager gave or what the discovery 
script found.
+  private def checkExecResourcesMeetTaskRequirements(
+      taskResourceConfigs: Array[(String, String)],
+      actualExecResources: Map[String, ResourceInformation]): Unit = {
+
+    // get just the map of resource name to count
+    val taskResourcesAndCounts = taskResourceConfigs.
+      withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
+      map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)}
+
+    taskResourcesAndCounts.foreach { case (rName, taskReqCount) =>
+      if (actualExecResources.contains(rName)) {
+        val execResourceInfo = actualExecResources(rName)
+
+        if (execResourceInfo.addresses.size < taskReqCount.toLong) {
+          throw new SparkException(s"Executor resource: $rName with addresses: 
" +
+            s"${execResourceInfo.addresses.mkString(",")} doesn't meet the 
task " +
+            s"requirements of needing $taskReqCount of them")
+        }
+        // also make sure the executor resource count on start matches the
+        // spark.executor.resource configs specified by user
+        val userExecCountConfigName =
+          SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_POSTFIX
+        val userExecConfigCount = env.conf.getOption(userExecCountConfigName).
+          getOrElse(throw new SparkException(s"Executor resource: $rName not 
specified " +
+            s"via config: $userExecCountConfigName, but required " +
+            s"by the task, please fix your configuration"))
+
+        if (userExecConfigCount.toLong != execResourceInfo.addresses.size) {
 
 Review comment:
   sure we can allow that, it might be a bit weird for yarn and k8s but can 
leave open for now and tighten later if needed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to