kiszk commented on a change in pull request #27313: [SPARK-29148][CORE] Add 
stage level scheduling dynamic allocation and scheduler backend changes
URL: https://github.com/apache/spark/pull/27313#discussion_r369712857
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##########
 @@ -344,6 +345,90 @@ private[spark] object ResourceUtils extends Logging {
     discoverResource(resourceName, script)
   }
 
+  def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+    // Number of cores per executor must meet at least one task requirement.
+    if (execCores < taskCpus) {
+      throw new SparkException(s"The number of cores per executor 
(=$execCores) has to be >= " +
+        s"the number of cpus per task = $taskCpus.")
+    }
+    true
+  }
+
+  // the option executor cores parameter is by the different local modes since 
it not configured
+  // via the config
+  def warnOnWastedResources(
+      rp: ResourceProfile,
+      sparkConf: SparkConf,
+      execCores: Option[Int] = None): Unit = {
+    // There have been checks on the ResourceProfile to make sure the executor 
resources were
+    // specified and are large enough if any task resources were specified.
+    // Now just do some sanity test and log warnings when it looks like the 
user will
+    // waste some resources.
+    val coresKnown = rp.isCoresLimitKnown
+    var limitingResource = rp.limitingResource(sparkConf)
+    var maxTaskPerExec = rp.maxTasksPerExecutor(sparkConf)
+    val taskCpus = rp.getTaskCpus.getOrElse(sparkConf.get(CPUS_PER_TASK))
+    val cores = if (execCores.isDefined) {
+      execCores.get
+    } else if (coresKnown) {
+      rp.getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
+    } else {
+      // can't calculate cores limit
+      return
+    }
+    // when executor cores config isn't set, we can't calculate the real 
limiting resource
+    // and number of tasks per executor ahead of time, so calculate it now.
+    if (!coresKnown) {
+      val numTasksPerExecCores = cores / taskCpus
+      val numTasksPerExecCustomResource = rp.maxTasksPerExecutor(sparkConf)
+      if (limitingResource.isEmpty ||
+        (limitingResource.nonEmpty && numTasksPerExecCores < 
numTasksPerExecCustomResource)) {
+        limitingResource = ResourceProfile.CPUS
+        maxTaskPerExec = numTasksPerExecCores
+      }
+    }
+    val taskReq = ResourceProfile.getCustomTaskResources(rp)
+    val execReq = ResourceProfile.getCustomExecutorResources(rp)
+
+    if (limitingResource.nonEmpty && 
!limitingResource.equals(ResourceProfile.CPUS)) {
+      if ((taskCpus * maxTaskPerExec) < cores) {
+        val resourceNumSlots = Math.floor(cores/taskCpus).toInt
+        val message = s"The configuration of cores (exec = ${cores} " +
+          s"task = ${taskCpus}, runnable tasks = ${resourceNumSlots}) will " +
+          s"result in wasted resources due to resource ${limitingResource} 
limiting the " +
+          s"number of runnable tasks per executor to: ${maxTaskPerExec}. 
Please adjust " +
+          s"your configuration."
+        if (sparkConf.get(RESOURCES_WARNING_TESTING)) {
+          throw new SparkException(message)
+        } else {
+          logWarning(message)
+        }
+      }
+    }
+
+    taskReq.foreach { case (rName, treq) =>
+      val execAmount = execReq(rName).amount
+      val numParts = rp.getNumSlotsPerAddress(rName, sparkConf)
+      // handle fractional
+      val taskAmount = if (numParts > 1) 1 else treq.amount
+      if (maxTaskPerExec < (execAmount * numParts / taskAmount)) {
+        val taskReqStr = s"${taskAmount}/${numParts}"
+        val resourceNumSlots = Math.floor(execAmount * numParts / 
taskAmount).toInt
+        val message = s"The configuration of resource: ${treq.resourceName} " +
+          s"(exec = ${execAmount}, task = ${taskReqStr}, " +
+          s"runnable tasks = ${resourceNumSlots}) will " +
+          s"result in wasted resources due to resource ${limitingResource} 
limiting the " +
+          s"number of runnable tasks per executor to: ${maxTaskPerExec}. 
Please adjust " +
+          s"your configuration."
 
 Review comment:
   nit: `s` is not required

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to