tgravescs commented on a change in pull request #24406: [SPARK-27024] Executor
interface for cluster managers to support GPU and other resources
URL: https://github.com/apache/spark/pull/24406#discussion_r282516119
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -71,6 +82,159 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread)
}
+ // we only support converting from units that are byte based, this will
+ // either convert it to bytes or just return the count if the units can't be
+ // parsed as bytes.
+ private def tryConvertUnitsToBytes(count: String, units: String): Long = {
+ try {
+ Utils.byteStringAsBytes(count + units)
+ } catch {
+ case e: NumberFormatException =>
+ // Ignore units not of byte types and just use count
+ logWarning("Illegal resource unit type, spark only " +
+ s"supports conversion of byte types, units: $units, " +
+ "ignoring the type and using the raw count.", e)
+ count.toLong
+ }
+ }
+
+ // Check that the executor resources at startup will satisfy the user
specified task
+ // requirements (spark.task.resource.*) and that they match the executor
configs
+ // specified by the user (spark.executor.resource.*) to catch mismatches
between what
+ // the user requested and what resource manager gave or what the discovery
script found.
+ private def checkExecResourcesMeetTaskRequirements(
+ taskResourceConfigs: Array[(String, String)],
+ actualExecResources: Map[String, ResourceInformation]): Unit = {
+
+ // get just the map of resource name to count
+ val taskResourcesAndCounts = taskResourceConfigs.
+ withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
+ map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)}
+
+ case class ResourceRealCounts(execCount: Long, taskCount: Long)
+
+ // SPARK will only base it off the counts and known byte units, if
+ // user is trying to use something else we will have to add a plugin later
+ taskResourcesAndCounts.foreach { case (rName, taskCount) =>
+ if (actualExecResources.contains(rName)) {
+ val execResourceInfo = actualExecResources(rName)
+ val taskUnits = env.conf.getOption(
+ SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_UNITS_POSTFIX)
+ val userExecUnitsConfigName =
+ SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_UNITS_POSTFIX
+ val userExecConfigUnits = env.conf.getOption(userExecUnitsConfigName)
+ val realCounts = if (execResourceInfo.units.nonEmpty) {
+ if (taskUnits.nonEmpty && taskUnits.get.nonEmpty) {
+ if (userExecConfigUnits.isEmpty ||
userExecConfigUnits.get.isEmpty) {
+ throw new SparkException(s"Resource: $rName has units in task
config " +
+ s"and executor startup config but the user specified executor
resource " +
+ s"config is missing the units config - see
${userExecUnitsConfigName}.")
+ }
+ val execCountWithUnits =
+ tryConvertUnitsToBytes(execResourceInfo.count.toString,
execResourceInfo.units)
+ val taskCountWithUnits =
+ tryConvertUnitsToBytes(taskCount, taskUnits.get)
+ ResourceRealCounts(execCountWithUnits, taskCountWithUnits)
+ } else {
+ throw new SparkException(
+ s"Resource: $rName has an executor units config:
${execResourceInfo.units}, but " +
+ s"the task units config is missing.")
+ }
+ } else {
+ if (taskUnits.nonEmpty && taskUnits.get.nonEmpty) {
+ throw new SparkException(
+ s"Resource: $rName has a task units config: ${taskUnits.get},
but the executor " +
+ s"units config is missing.")
+ }
+ ResourceRealCounts(execResourceInfo.count, taskCount.toLong)
+ }
+ if (realCounts.execCount < realCounts.taskCount) {
+ throw new SparkException(s"Executor resource: $rName, count:
${realCounts.execCount} " +
+ s"isn't large enough to meet task requirements of:
${realCounts.taskCount}")
+ }
+ // also make sure the executor resource count on start matches the
+ // spark.executor.resource configs specified by user
+ val userExecCountConfigName =
+ SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_POSTFIX
+ val userExecConfigCount = env.conf.getOption(userExecCountConfigName).
+ getOrElse(throw new SparkException(s"Executor resource: $rName not
specified " +
+ s"via config: $userExecCountConfigName, but required " +
+ s"by the task, please fix your configuration"))
+ val execConfigCountWithUnits = if (userExecConfigUnits.nonEmpty) {
+ tryConvertUnitsToBytes(userExecConfigCount, userExecConfigUnits.get)
+ } else {
+ userExecConfigCount.toLong
+ }
+ if (execConfigCountWithUnits != realCounts.execCount) {
+ throw new SparkException(s"Executor resource: $rName, count:
${realCounts.execCount} " +
+ s"doesn't match what user requests for executor count:
$execConfigCountWithUnits, " +
+ s"via $userExecCountConfigName")
+ }
+ } else {
+ throw new SparkException(s"Executor resource config missing required
task resource: $rName")
+ }
+ }
+ }
+
+ // visible for testing
+ def parseResources(resourcesFile: Option[String]): Map[String,
ResourceInformation] = {
+ // only parse the resources if a task requires them
+ val taskResourceConfigs =
env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
+ val resourceInfo = if (taskResourceConfigs.nonEmpty) {
+ val execResources = resourcesFile.map { resourceFileStr => {
+ val source = new BufferedInputStream(new
FileInputStream(resourceFileStr))
+ val resourceMap = try {
+ val parsedJson = parse(source).asInstanceOf[JArray].arr
+ parsedJson.map(_.extract[ResourceInformation]).map(x => (x.name ->
x)).toMap
+ } catch {
+ case e @ (_: MappingException | _: MismatchedInputException | _:
ClassCastException) =>
+ throw new SparkException(
+ s"Exception parsing the resources in $resourceFileStr", e)
+ } finally {
+ source.close()
+ }
+ resourceMap
+ }}.getOrElse(ResourceDiscoverer.findResources(env.conf, false))
Review comment:
so there are 2 cases:
1) standalone mode - in this case the worker has to discover the resources
and will pass in what to assign to each executor via the --resourcesFile
argument
2) k8s and yarn (maybe mesos) - in this case you are expected to start in an
isolated container and yarn and k8s don't tell you what you have been
allocated. So the executor has to auto discovery what the container has. You
have to do this auto discovery before the executor registers with the driver so
it can pass what it has for resources, so this is the only place to do that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]