tgravescs commented on a change in pull request #24406: [SPARK-27024] Executor
interface for cluster managers to support GPU and other resources
URL: https://github.com/apache/spark/pull/24406#discussion_r283028399
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -71,6 +82,99 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread)
}
+ // Check that the executor resources at startup will satisfy the user
specified task
+ // requirements (spark.task.resource.*) and that they match the executor
configs
+ // specified by the user (spark.executor.resource.*) to catch mismatches
between what
+ // the user requested and what resource manager gave or what the discovery
script found.
+ private def checkExecResourcesMeetTaskRequirements(
+ taskResourceConfigs: Array[(String, String)],
+ actualExecResources: Map[String, ResourceInformation]): Unit = {
+
+ // get just the map of resource name to count
+ val taskResourcesAndCounts = taskResourceConfigs.
+ withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
+ map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)}
+
+ taskResourcesAndCounts.foreach { case (rName, taskReqCount) =>
+ if (actualExecResources.contains(rName)) {
+ val execResourceInfo = actualExecResources(rName)
+
+ if (execResourceInfo.addresses.size < taskReqCount.toLong) {
+ throw new SparkException(s"Executor resource: $rName with addresses:
" +
+ s"${execResourceInfo.addresses.mkString(",")} doesn't meet the
task " +
+ s"requirements of needing $taskReqCount of them")
+ }
+ // also make sure the executor resource count on start matches the
+ // spark.executor.resource configs specified by user
+ val userExecCountConfigName =
+ SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_POSTFIX
+ val userExecConfigCount = env.conf.getOption(userExecCountConfigName).
+ getOrElse(throw new SparkException(s"Executor resource: $rName not
specified " +
+ s"via config: $userExecCountConfigName, but required " +
+ s"by the task, please fix your configuration"))
+
+ if (userExecConfigCount.toLong != execResourceInfo.addresses.size) {
+ throw new SparkException(s"Executor resource: $rName, with
addresses: " +
+ s"${execResourceInfo.addresses.mkString(",")} " +
+ s"doesn't match what the user requested for executor count:
$userExecConfigCount, " +
+ s"via $userExecCountConfigName")
+ }
+ } else {
+ throw new SparkException(s"Executor resource config missing required
task resource: $rName")
+ }
+ }
+ }
+
+ // visible for testing
+ def parseOrFindResources(resourcesFile: Option[String]): Map[String,
ResourceInformation] = {
+ // only parse the resources if a task requires them
+ val taskResourceConfigs =
env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
+ val resourceInfo = if (taskResourceConfigs.nonEmpty) {
+ val execResources = resourcesFile.map { resourceFileStr => {
+ val source = new BufferedInputStream(new
FileInputStream(resourceFileStr))
+ val resourceMap = try {
+ val parsedJson = parse(source).asInstanceOf[JArray].arr
+ parsedJson.map { json =>
+ val name = (json \ "name").extract[String]
+ val addresses = (json \
"addresses").extract[Array[JValue]].map(_.extract[String])
+ new ResourceInformation(name, addresses)
+ }.map(x => (x.name -> x)).toMap
+ } catch {
+ case e @ (_: MappingException | _: MismatchedInputException) =>
+ throw new SparkException(
+ s"Exception parsing the resources in $resourceFileStr", e)
+ } finally {
+ source.close()
+ }
+ resourceMap
+ }}.getOrElse(ResourceDiscoverer.findResources(env.conf, false))
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]