tgravescs commented on a change in pull request #24406: [SPARK-27024] Executor
interface for cluster managers to support GPU and other resources
URL: https://github.com/apache/spark/pull/24406#discussion_r281325657
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -71,6 +82,161 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread)
}
+ // Check that the executor resources at startup will satisfy the user
specified task
+ // requirements (spark.taks.resource.*) and that they match the executor
configs
+ // specified by the user (spark.executor.resource.*) to catch mismatches
between what
+ // the user requested and what resource manager gave or what the discovery
script found.
+ private def checkExecResourcesMeetTaskRequirements(
+ taskResourceConfigs: Array[(String, String)],
+ actualExecResources: Map[String, ResourceInformation]): Unit = {
+
+ // get just the of resource name to count
+ val taskResourcesAndCounts = taskResourceConfigs.
+ withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
+ map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)}
+
+ case class ResourceRealCounts(execCount: Long, taskCount: Long)
+
+ // SPARK will only base it off the counts and known byte units, if
+ // user is trying to use something else we will have to add a plugin later
+ taskResourcesAndCounts.foreach { case (rName, taskCount) =>
+ if (actualExecResources.contains(rName)) {
+ val execResourceInfo = actualExecResources(rName)
+ val taskUnits = env.conf.getOption(
+ SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_UNITS_POSTFIX)
+ val userExecUnitsConfigName =
+ SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_UNITS_POSTFIX
+ val userExecConfigUnits = env.conf.getOption(userExecUnitsConfigName)
+ val realCounts = if (execResourceInfo.units.nonEmpty) {
+ if (taskUnits.nonEmpty && taskUnits.get.nonEmpty) {
+ if (userExecConfigUnits.isEmpty ||
userExecConfigUnits.get.isEmpty) {
+ throw new SparkException(s"Resource: $rName has units in task
config " +
+ s"and executor startup config but the user specified executor
resource " +
+ s"config is missing the units config - see
${userExecUnitsConfigName}.")
+ }
+ try {
+ val execCountWithUnits =
Review comment:
I don't think we need to explicitly state memory, we can just say in the
documentation jira that it only supports conversion of byte types. But that
can be done in the documentation jira
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]