kiszk commented on a change in pull request #24406: [SPARK-27024] Executor
interface for cluster managers to support GPU and other resources
URL: https://github.com/apache/spark/pull/24406#discussion_r283589732
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -72,6 +83,97 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread)
}
+ // Check that the actual resources discovered will satisfy the user specified
+ // requirements and that they match the configs specified by the user to
catch
+ // mismatches between what the user requested and what resource manager gave
or
+ // what the discovery script found.
+ private def checkResourcesMeetRequirements(
+ resourceConfigPrefix: String,
+ reqResourcesAndCounts: Array[(String, String)],
+ actualResources: Map[String, ResourceInformation]): Unit = {
+
+ reqResourcesAndCounts.foreach { case (rName, reqCount) =>
+ if (actualResources.contains(rName)) {
+ val resourceInfo = actualResources(rName)
+
+ if (resourceInfo.addresses.size < reqCount.toLong) {
+ throw new SparkException(s"Resource: $rName with addresses: " +
+ s"${resourceInfo.addresses.mkString(",")} doesn't meet the " +
+ s"requirements of needing $reqCount of them")
+ }
+ // also make sure the resource count on start matches the
+ // resource configs specified by user
+ val userCountConfigName =
+ resourceConfigPrefix + rName + SPARK_RESOURCE_COUNT_POSTFIX
+ val userConfigCount = env.conf.getOption(userCountConfigName).
+ getOrElse(throw new SparkException(s"Resource: $rName not specified
" +
+ s"via config: $userCountConfigName, but required, " +
+ "please fix your configuration"))
+
+ if (userConfigCount.toLong > resourceInfo.addresses.size) {
+ throw new SparkException(s"Resource: $rName, with addresses: " +
+ s"${resourceInfo.addresses.mkString(",")} " +
+ s"is less than what the user requested for count:
$userConfigCount, " +
+ s"via $userCountConfigName")
+ }
+ } else {
+ throw new SparkException(s"Executor resource config missing required
task resource: $rName")
+ }
+ }
+ }
+
+ // visible for testing
+ def parseOrFindResources(resourcesFile: Option[String]): Map[String,
ResourceInformation] = {
+ // only parse the resources if a task requires them
+ val taskResourceConfigs =
env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
+ val resourceInfo = if (taskResourceConfigs.nonEmpty) {
+ val execResources = resourcesFile.map { resourceFileStr => {
+ val source = new BufferedInputStream(new
FileInputStream(resourceFileStr))
+ val resourceMap = try {
+ val parsedJson = parse(source).asInstanceOf[JArray].arr
+ parsedJson.map { json =>
+ val name = (json \ "name").extract[String]
+ val addresses = (json \ "addresses").extract[Array[String]]
+ new ResourceInformation(name, addresses)
+ }.map(x => (x.name -> x)).toMap
+ } catch {
+ case e @ (_: MappingException | _: MismatchedInputException) =>
+ throw new SparkException(
+ s"Exception parsing the resources in $resourceFileStr", e)
+ } finally {
+ source.close()
+ }
+ resourceMap
+ }}.getOrElse(ResourceDiscoverer.findResources(env.conf, isDriver =
false))
+
+ if (execResources.isEmpty) {
+ throw new SparkException("User specified resources per task via: " +
+ s"$SPARK_TASK_RESOURCE_PREFIX, but can't find any resources
available on the executor.")
+ }
+ // get just the map of resource name to count
+ val resourcesAndCounts = taskResourceConfigs.
+ withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
+ map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size),
v)}
+
+ checkResourcesMeetRequirements(SPARK_EXECUTOR_RESOURCE_PREFIX,
resourcesAndCounts,
+ execResources)
+
+
logInfo("===============================================================================")
+ logInfo("Executor $executorId Resources:")
Review comment:
`s` is required before `"` as `s"Executor $executorId Resources:"`
This is because we are using `$executorId`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]