HyukjinKwon commented on a change in pull request #27313: [SPARK-29148][CORE]
Add stage level scheduling dynamic allocation and scheduler backend changes
URL: https://github.com/apache/spark/pull/27313#discussion_r380423718
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -67,6 +76,138 @@ class ResourceProfile(
taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
}
+ private[spark] def getNumSlotsPerAddress(resource: String, sparkConf:
SparkConf): Int = {
+ _executorResourceSlotsPerAddr.getOrElse {
+ calculateTasksAndLimitingResource(sparkConf)
+ }
+ _executorResourceSlotsPerAddr.get.getOrElse(resource,
+ throw new SparkException(s"Resource $resource doesn't exist in profile
id: $id"))
+ }
+
+ // Maximum tasks you could put on an executor with this profile based on the
limiting resource.
+ // If the executor cores config is not present this value is based on the
other resources
+ // available or 1 if no other resources. You need to check the
isCoresLimitKnown to
+ // calculate proper value.
+ private[spark] def maxTasksPerExecutor(sparkConf: SparkConf): Int = {
+ _maxTasksPerExecutor.getOrElse {
+ calculateTasksAndLimitingResource(sparkConf)
+ _maxTasksPerExecutor.get
+ }
+ }
+
+ // Returns whether the executor cores was available to use to calculate the
max tasks
+ // per executor and limiting resource. Some cluster managers (like
standalone and coarse
+ // grained mesos) don't use the cores config by default so we can't use it
to calculate slots.
+ private[spark] def isCoresLimitKnown: Boolean = _coresLimitKnown
+
+ // The resource that has the least amount of slots per executor. Its
possible multiple or all
+ // resources result in same number of slots and this could be any of those.
+ // If the executor cores config is not present this value is based on the
other resources
+ // available or empty string if no other resources. You need to check the
isCoresLimitKnown to
+ // calculate proper value.
+ private[spark] def limitingResource(sparkConf: SparkConf): String = {
+ _limitingResource.getOrElse {
+ calculateTasksAndLimitingResource(sparkConf)
+ _limitingResource.get
+ }
+ }
+
+ // executor cores config is not set for some masters by default and the
default value
+ // only applies to yarn/k8s
+ private def shouldCheckExecutorCores(sparkConf: SparkConf): Boolean = {
+ val master = sparkConf.getOption("spark.master")
+ sparkConf.contains(EXECUTOR_CORES) ||
+ (master.isDefined && (master.get.equalsIgnoreCase("yarn") ||
master.get.startsWith("k8s")))
+ }
+
+ /**
+ * Utility function to calculate the number of tasks you can run on a single
Executor based
+ * on the task and executor resource requests in the ResourceProfile. This
will be based
+ * off the resource that is most restrictive. For instance, if the executor
+ * request is for 4 cpus and 2 gpus and your task request is for 1 cpu and 1
gpu each, the
+ * limiting resource is gpu and the number of tasks you can run on a single
executor is 2.
+ * This function also sets the limiting resource, isCoresLimitKnown and
number of slots per
+ * resource address.
+ */
+ private def calculateTasksAndLimitingResource(sparkConf: SparkConf): Unit =
synchronized {
+ val shouldCheckExecCores = shouldCheckExecutorCores(sparkConf)
+ var (taskLimit, limitingResource) = if (shouldCheckExecCores) {
+ val cpusPerTask = taskResources.get(ResourceProfile.CPUS)
+ .map(_.amount).getOrElse(sparkConf.get(CPUS_PER_TASK).toDouble).toInt
+ assert(cpusPerTask > 0, "CPUs per task configuration has to be > 0")
+ val coresPerExecutor =
getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
+ _coresLimitKnown = true
+ ResourceUtils.validateTaskCpusLargeEnough(coresPerExecutor, cpusPerTask)
+ val tasksBasedOnCores = coresPerExecutor / cpusPerTask
+ // Note that if the cores per executor aren't set properly this
calculation could be off,
+ // we default it to just be 1 in order to allow checking of the rest of
the custom
+ // resources. We set the limit based on the other resources available.
+ (tasksBasedOnCores, ResourceProfile.CPUS)
+ } else {
+ (-1, "")
+ }
+ val numPartsPerResourceMap = new mutable.HashMap[String, Int]
+ numPartsPerResourceMap(ResourceProfile.CORES) = 1
+ val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest]
+ taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this)
+ val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this)
+ execResourceToCheck.foreach { case (rName, execReq) =>
+ val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0)
+ numPartsPerResourceMap(rName) = 1
+ if (taskReq > 0.0) {
+ if (taskReq > execReq.amount) {
+ throw new SparkException(s"The executor resource: $rName, amount:
${execReq.amount} " +
+ s"needs to be >= the task resource request amount of $taskReq")
+ }
+ val (numPerTask, parts) =
ResourceUtils.calculateAmountAndPartsForFraction(taskReq)
+ numPartsPerResourceMap(rName) = parts
+ val numTasks = ((execReq.amount * parts) / numPerTask).toInt
+ if (taskLimit == -1 || numTasks < taskLimit) {
+ if (shouldCheckExecCores) {
+ // TODO - until resource profiles full implemented we need to
error if cores not
+ // limiting resource because the scheduler code uses that for slots
+ throw new IllegalArgumentException("The number of slots on an
executor has to be " +
+ "limited by the number of cores, otherwise you waste resources
and " +
+ "dynamic allocation doesn't work properly. Your configuration
has " +
+ s"core/task cpu slots = ${taskLimit} and " +
+ s"${execReq.resourceName} = ${numTasks}. " +
+ "Please adjust your configuration so that all resources require
same number " +
+ "of executor slots.")
+ }
+ limitingResource = rName
+ taskLimit = numTasks
+ }
+ taskResourcesToCheck -= rName
+ } else {
+ logWarning(s"The executor resource config for resource: $rName was
specified but " +
+ "no corresponding task resource request was specified.")
+ }
+ }
+ if(!shouldCheckExecCores) {
+ // if we can't rely on the executor cores config throw a warning for user
+ logWarning("Please ensure that the number of slots available on your " +
Review comment:
Hey @tgravescs, I was investigating sudden warning message popped up in my
local suddenly, presumably, after this fix. My Spark shell shows this warnings
consistently in my local:
```
$ ./bin/spark-shell
```
```
...
20/02/18 11:04:56 WARN ResourceProfile: Please ensure that the number of
slots available on your executors is
limited by the number of cores to task cpus and not another custom resource.
If cores is not the limiting resource
then dynamic allocation will not work properly!
```
Do you have any idea?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]