Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE]
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293853139
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2723,44 +2689,50 @@ object SparkContext extends Logging {
// Calculate the max slots each executor can provide based on resources
available on each
// executor and resources required by each task.
- val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
- val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
- SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
+ val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
+ val executorResourcesAndCounts =
+ parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
+ .map(request => (request.id.resourceName, request.amount)).toMap
var numSlots = execCores / taskCores
var limitingResourceName = "CPU"
- taskResourcesAndCount.foreach { case (rName, taskCount) =>
+
+ taskResourceRequirements.foreach { taskReq =>
// Make sure the executor resources were specified through config.
- val execCount = executorResourcesAndCounts.getOrElse(rName,
- throw new SparkException(
- s"The executor resource config: " +
- s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
- "needs to be specified since a task requirement config: " +
- s"${SPARK_TASK_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_AMOUNT_SUFFIX} was specified")
+ val execCount =
executorResourcesAndCounts.getOrElse(taskReq.resourceName,
+ throw new SparkException("The executor resource config: " +
+ ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf
+
+ " needs to be specified since a task requirement config: " +
+ ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+ " was specified")
)
// Make sure the executor resources are large enough to launch at
least one task.
- if (execCount.toLong < taskCount.toLong) {
- throw new SparkException(
- s"The executor resource config: " +
- s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
- s"= $execCount has to be >= the task config: " +
- s"${SPARK_TASK_RESOURCE_PREFIX + rName +
SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount")
+ if (execCount < taskReq.amount) {
+ throw new SparkException("The executor resource config: " +
+ ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf
+
+ s" = $execCount has to be >= the task config: " +
+ ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+ s" = ${taskReq.amount}")
}
// Compare and update the max slots each executor can provide.
- val resourceNumSlots = execCount.toInt / taskCount
+ val resourceNumSlots = execCount / taskReq.amount
if (resourceNumSlots < numSlots) {
+ logWarning(s"The configuration of resource: ${taskReq.resourceName}
" +
+ s"(limits tasks to $resourceNumSlots) will result in wasted
resources of resource " +
+ s"${limitingResourceName} (would allow for $numSlots tasks). " +
+ "Please adjust your configuration.")
numSlots = resourceNumSlots
- limitingResourceName = rName
+ limitingResourceName = taskReq.resourceName
}
}
// There have been checks above to make sure the executor resources were
specified and are
// large enough if any task resources were specified.
- taskResourcesAndCount.foreach { case (rName, taskCount) =>
- val execCount = executorResourcesAndCounts(rName)
- if (taskCount.toInt * numSlots < execCount.toInt) {
- val message = s"The configuration of resource: $rName (exec =
${execCount.toInt}, " +
- s"task = ${taskCount}) will result in wasted resources due to
resource " +
- s"${limitingResourceName} limiting the number of runnable tasks
per executor to: " +
- s"${numSlots}. Please adjust your configuration."
+ taskResourceRequirements.foreach { taskReq =>
+ val execCount = executorResourcesAndCounts(taskReq.resourceName)
+ if (taskReq.amount * numSlots < execCount) {
+ val message = s"The configuration of resource:
${taskReq.resourceName} " +
+ s"(exec = ${execCount}, task = ${taskReq.amount}) will result in
wasted " +
+ s"resources due to resource ${limitingResourceName} limiting the
number of " +
+ s"runnable tasks per executor to: ${numSlots}. Please adjust your
configuration."
Review comment:
I'd prefer to log warn the limited `numSlots` right above
`taskResourceRequirements.foreach` if `limitingResourceName` changed. And in
`message`, appending the specific amount of the wasted resource to show user
the config result directly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]