HyukjinKwon commented on a change in pull request #27313: [SPARK-29148][CORE] 
Add stage level scheduling dynamic allocation and scheduler backend changes
URL: https://github.com/apache/spark/pull/27313#discussion_r380423718
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
 ##########
 @@ -67,6 +76,138 @@ class ResourceProfile(
     taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
   }
 
+  private[spark] def getNumSlotsPerAddress(resource: String, sparkConf: 
SparkConf): Int = {
+    _executorResourceSlotsPerAddr.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+    }
+    _executorResourceSlotsPerAddr.get.getOrElse(resource,
+      throw new SparkException(s"Resource $resource doesn't exist in profile 
id: $id"))
+  }
+
+  // Maximum tasks you could put on an executor with this profile based on the 
limiting resource.
+  // If the executor cores config is not present this value is based on the 
other resources
+  // available or 1 if no other resources. You need to check the 
isCoresLimitKnown to
+  // calculate proper value.
+  private[spark] def maxTasksPerExecutor(sparkConf: SparkConf): Int = {
+    _maxTasksPerExecutor.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+      _maxTasksPerExecutor.get
+    }
+  }
+
+  // Returns whether the executor cores was available to use to calculate the 
max tasks
+  // per executor and limiting resource. Some cluster managers (like 
standalone and coarse
+  // grained mesos) don't use the cores config by default so we can't use it 
to calculate slots.
+  private[spark] def isCoresLimitKnown: Boolean = _coresLimitKnown
+
+  // The resource that has the least amount of slots per executor. Its 
possible multiple or all
+  // resources result in same number of slots and this could be any of those.
+  // If the executor cores config is not present this value is based on the 
other resources
+  // available or empty string if no other resources. You need to check the 
isCoresLimitKnown to
+  // calculate proper value.
+  private[spark] def limitingResource(sparkConf: SparkConf): String = {
+    _limitingResource.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+      _limitingResource.get
+    }
+  }
+
+  // executor cores config is not set for some masters by default and the 
default value
+  // only applies to yarn/k8s
+  private def shouldCheckExecutorCores(sparkConf: SparkConf): Boolean = {
+    val master = sparkConf.getOption("spark.master")
+    sparkConf.contains(EXECUTOR_CORES) ||
+      (master.isDefined && (master.get.equalsIgnoreCase("yarn") || 
master.get.startsWith("k8s")))
+  }
+
+  /**
+   * Utility function to calculate the number of tasks you can run on a single 
Executor based
+   * on the task and executor resource requests in the ResourceProfile. This 
will be based
+   * off the resource that is most restrictive. For instance, if the executor
+   * request is for 4 cpus and 2 gpus and your task request is for 1 cpu and 1 
gpu each, the
+   * limiting resource is gpu and the number of tasks you can run on a single 
executor is 2.
+   * This function also sets the limiting resource, isCoresLimitKnown and 
number of slots per
+   * resource address.
+   */
+  private def calculateTasksAndLimitingResource(sparkConf: SparkConf): Unit = 
synchronized {
+    val shouldCheckExecCores = shouldCheckExecutorCores(sparkConf)
+    var (taskLimit, limitingResource) = if (shouldCheckExecCores) {
+      val cpusPerTask = taskResources.get(ResourceProfile.CPUS)
+        .map(_.amount).getOrElse(sparkConf.get(CPUS_PER_TASK).toDouble).toInt
+      assert(cpusPerTask > 0, "CPUs per task configuration has to be > 0")
+      val coresPerExecutor = 
getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
+      _coresLimitKnown = true
+      ResourceUtils.validateTaskCpusLargeEnough(coresPerExecutor, cpusPerTask)
+      val tasksBasedOnCores = coresPerExecutor / cpusPerTask
+      // Note that if the cores per executor aren't set properly this 
calculation could be off,
+      // we default it to just be 1 in order to allow checking of the rest of 
the custom
+      // resources. We set the limit based on the other resources available.
+      (tasksBasedOnCores, ResourceProfile.CPUS)
+    } else {
+      (-1, "")
+    }
+    val numPartsPerResourceMap = new mutable.HashMap[String, Int]
+    numPartsPerResourceMap(ResourceProfile.CORES) = 1
+    val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest]
+    taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this)
+    val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this)
+    execResourceToCheck.foreach { case (rName, execReq) =>
+      val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0)
+      numPartsPerResourceMap(rName) = 1
+      if (taskReq > 0.0) {
+        if (taskReq > execReq.amount) {
+          throw new SparkException(s"The executor resource: $rName, amount: 
${execReq.amount} " +
+            s"needs to be >= the task resource request amount of $taskReq")
+        }
+        val (numPerTask, parts) = 
ResourceUtils.calculateAmountAndPartsForFraction(taskReq)
+        numPartsPerResourceMap(rName) = parts
+        val numTasks = ((execReq.amount * parts) / numPerTask).toInt
+        if (taskLimit == -1 || numTasks < taskLimit) {
+          if (shouldCheckExecCores) {
+            // TODO - until resource profiles full implemented we need to 
error if cores not
+            // limiting resource because the scheduler code uses that for slots
+            throw new IllegalArgumentException("The number of slots on an 
executor has to be " +
+              "limited by the number of cores, otherwise you waste resources 
and " +
+              "dynamic allocation doesn't work properly. Your configuration 
has " +
+              s"core/task cpu slots = ${taskLimit} and " +
+              s"${execReq.resourceName} = ${numTasks}. " +
+              "Please adjust your configuration so that all resources require 
same number " +
+              "of executor slots.")
+          }
+          limitingResource = rName
+          taskLimit = numTasks
+        }
+        taskResourcesToCheck -= rName
+      } else {
+        logWarning(s"The executor resource config for resource: $rName was 
specified but " +
+          "no corresponding task resource request was specified.")
+      }
+    }
+    if(!shouldCheckExecCores) {
+      // if we can't rely on the executor cores config throw a warning for user
+      logWarning("Please ensure that the number of slots available on your " +
 
 Review comment:
   Hey @tgravescs, I was investigating sudden warning message popped up in my 
local suddenly, presumably, after this fix. My Spark shell shows this warnings 
consistently in my local:
   
   ```
   $ ./bin/spark-shell
   ```
   ```
   ...
   20/02/18 11:04:56 WARN ResourceProfile: Please ensure that the number of 
slots available on your executors is 
   limited by the number of cores to task cpus and not another custom resource. 
If cores is not the limiting resource 
   then dynamic allocation will not work properly!
   ```
   
   Do you have any idea?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to