mridulm commented on a change in pull request #30204:
URL: https://github.com/apache/spark/pull/30204#discussion_r521811763



##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -293,20 +305,32 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, 
ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    ereqs.cores(conf.get(EXECUTOR_CORES))
-    ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
-    conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => 
ereqs.memoryOverhead(mem.toString))
-    conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => 
ereqs.pysparkMemory(mem.toString))
-    if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
-      // Explicitly add suffix b as default unit of offHeapMemory is Mib
-      ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b")
-    }
+    val cores = conf.get(EXECUTOR_CORES)
+    ereqs.cores(cores)
+    val memory = conf.get(EXECUTOR_MEMORY)
+    ereqs.memory(memory.toString)
+    val overheadMem = conf.get(EXECUTOR_MEMORY_OVERHEAD)
+    overheadMem.map(mem => ereqs.memoryOverhead(mem.toString))
+    val pysparkMem = conf.get(PYSPARK_EXECUTOR_MEMORY)
+    pysparkMem.map(mem => ereqs.pysparkMemory(mem.toString))
+    val offheapMem = Utils.executorOffHeapMemorySizeAsMb(conf)
+    ereqs.offHeapMemory(offheapMem.toString)
     val execReq = ResourceUtils.parseAllResourceRequests(conf, 
SPARK_EXECUTOR_PREFIX)
+    val customResources = new mutable.HashMap[String, ExecutorResourceRequest]
     execReq.foreach { req =>
       val name = req.id.resourceName
+      customResources(name) =
+        new ExecutorResourceRequest(
+          req.id.resourceName,
+          req.amount,
+          req.discoveryScript.orElse(""),
+          req.vendor.orElse(""))
       ereqs.resource(name, req.amount, req.discoveryScript.orElse(""),
         req.vendor.orElse(""))
     }

Review comment:
       Instead of creating `customResources` above, will this not work ?
   ```
   val customResourceNames = execReq.map(_.id.resourceName).toSet
   val customResources = ereqs.requests.filter(v => 
customResourceNames.contains(v._1))
   ```

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -342,6 +367,110 @@ object ResourceProfile extends Logging {
     rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
   }
 
+  /**
+   * Get offHeap memory size from [[ExecutorResourceRequest]]
+   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  private[spark] def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+      execRequest: ExecutorResourceRequest): Long = {
+    Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
+  }
+
+  private[spark] case class ExecutorResourcesOrDefaults(
+      cores: Int,
+      executorMemoryMiB: Long,
+      memoryOffHeapMiB: Long,
+      pysparkMemoryMiB: Long,
+      memoryOverheadMiB: Long,
+      totalMemMiB: Long,
+      customResources: Map[String, ExecutorResourceRequest])
+
+  private[spark] case class DefaultProfileExecutorResources(
+      cores: Int,
+      executorMemoryMiB: Long,
+      memoryOffHeapMiB: Long,
+      pysparkMemoryMiB: Option[Long],
+      memoryOverheadMiB: Option[Long],
+      customResources: Map[String, ExecutorResourceRequest])
+
+  private[spark] def calculateOverHeadMemory(
+      overHeadMemFromConf: Option[Long],
+      executorMemoryMiB: Long,
+      overheadFactor: Double): Long = {
+    overHeadMemFromConf.getOrElse(math.max((overheadFactor * 
executorMemoryMiB).toInt,
+        ResourceProfile.MEMORY_OVERHEAD_MIN))
+  }
+
+  /**
+   * Gets the full list of resources to allow a cluster manager to request the 
appropriate
+   * container. If the resource profile is not the default one we either get 
the resources
+   * specified in the profile or fall back to the default profile resource 
size for everything
+   * except for custom resources.
+   */
+  private[spark] def getResourcesForClusterManager(
+      rpId: Int,
+      execResources: Map[String, ExecutorResourceRequest],
+      overheadFactor: Double,
+      conf: SparkConf,
+      isPythonApp: Boolean,
+      resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults = {
+    val defaultResources = getDefaultProfileExecutorResources(conf)
+    if (rpId == DEFAULT_RESOURCE_PROFILE_ID) {
+      val memoryOverheadMiB = ResourceProfile.calculateOverHeadMemory(
+        defaultResources.memoryOverheadMiB, 
defaultResources.executorMemoryMiB, overheadFactor)
+      val pysparkMemToUseMiB = if (isPythonApp) {
+        defaultResources.pysparkMemoryMiB.getOrElse(0L)
+      } else {
+        0L
+      }
+      val execMem = defaultResources.executorMemoryMiB
+      val offHeap = defaultResources.memoryOffHeapMiB
+      val totalMemMiB = execMem + offHeap + pysparkMemToUseMiB + 
memoryOverheadMiB
+      val customResources = defaultResources.customResources.map { case 
(rName, execReq) =>
+        val nameToUse = resourceMappings.get(rName).getOrElse(rName)
+        (nameToUse, execReq)
+      }

Review comment:
       Can we:
   
   1.  Move all the `val` above to above the `if (rpId == 
DEFAULT_RESOURCE_PROFILE_ID)` and make them `var`.
   2. Have only `totalMemMiB `, `customResources` and 
`ExecutorResourcesOrDefaults` inside `if`.
   3. Remove the same `var` initialization in `else` block (since they are 
covered in the change from (1) above).
   
   I am trying to see if we can make this more concise ...
   Thoughts ?

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -256,13 +257,16 @@ object ResourceProfile extends Logging {
   val UNKNOWN_RESOURCE_PROFILE_ID = -1
   val DEFAULT_RESOURCE_PROFILE_ID = 0
 
+  private[spark] val MEMORY_OVERHEAD_MIN = 384L

Review comment:
       nit: `MEMORY_OVERHEAD_MIN` -> `MEMORY_OVERHEAD_MIN_MIB` ?

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -342,6 +367,110 @@ object ResourceProfile extends Logging {
     rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
   }
 
+  /**
+   * Get offHeap memory size from [[ExecutorResourceRequest]]
+   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  private[spark] def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+      execRequest: ExecutorResourceRequest): Long = {
+    Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
+  }
+
+  private[spark] case class ExecutorResourcesOrDefaults(
+      cores: Int,
+      executorMemoryMiB: Long,
+      memoryOffHeapMiB: Long,
+      pysparkMemoryMiB: Long,
+      memoryOverheadMiB: Long,
+      totalMemMiB: Long,
+      customResources: Map[String, ExecutorResourceRequest])
+
+  private[spark] case class DefaultProfileExecutorResources(

Review comment:
       Can we replace `DefaultProfileExecutorResources` with 
`ExecutorResourcesOrDefaults` ?
   I am not sure what value it was adding which use of 
`ExecutorResourcesOrDefaults` cant cover.

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -302,48 +273,38 @@ private[yarn] class YarnAllocator(
   }
 
   // if a ResourceProfile hasn't been seen yet, create the corresponding YARN 
Resource for it
-  private def createYarnResourceForResourceProfile(
-      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = 
synchronized {
-    resourceProfileToTotalExecs.foreach { case (rp, num) =>
-      if (!rpIdToYarnResource.contains(rp.id)) {
-        // Start with the application or default settings
-        var heapMem = executorMemory.toLong
-        var offHeapMem = executorOffHeapMemory.toLong
-        var overheadMem = memoryOverhead.toLong
-        var pysparkMem = pysparkWorkerMemory.toLong
-        var cores = defaultExecutorCores
-        val customResources = new mutable.HashMap[String, String]
-        // track the resource profile if not already there
-        getOrUpdateRunningExecutorForRPId(rp.id)
-        logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
-        val execResources = rp.executorResources
-        execResources.foreach { case (r, execReq) =>
-          r match {
-            case ResourceProfile.MEMORY =>
-              heapMem = execReq.amount
-            case ResourceProfile.OVERHEAD_MEM =>
-              overheadMem = execReq.amount
-            case ResourceProfile.PYSPARK_MEM =>
-              pysparkMem = execReq.amount
-            case ResourceProfile.OFFHEAP_MEM =>
-              offHeapMem = 
YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq)
-            case ResourceProfile.CORES =>
-              cores = execReq.amount.toInt
-            case "gpu" =>
-              customResources(YARN_GPU_RESOURCE_CONFIG) = 
execReq.amount.toString
-            case "fpga" =>
-              customResources(YARN_FPGA_RESOURCE_CONFIG) = 
execReq.amount.toString
-            case rName =>
-              customResources(rName) = execReq.amount.toString
-          }
+  private def createYarnResourceForResourceProfile(rp: ResourceProfile): Unit 
= synchronized {
+    if (!rpIdToYarnResource.contains(rp.id)) {
+      // track the resource profile if not already there
+      getOrUpdateRunningExecutorForRPId(rp.id)
+      logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
+      val resourcesWithDefaults =
+        ResourceProfile.getResourcesForClusterManager(rp.id, 
rp.executorResources,
+          MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp,
+          ResourceRequestHelper.resourceNameMapping)
+      val customSparkResources =
+        resourcesWithDefaults.customResources.map { case (name, execReq) =>
+          (name, execReq.amount.toString)
         }
-        val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt
-        val resource = Resource.newInstance(totalMem, cores)
-        ResourceRequestHelper.setResourceRequests(customResources.toMap, 
resource)
-        logDebug(s"Created resource capability: $resource")
-        rpIdToYarnResource.putIfAbsent(rp.id, resource)
-        rpIdToResourceProfile(rp.id) = rp
+      // YARN specified resources are only supported with the default profile 
and we only
+      // pick up Spark custom resources for GPU and FPGA with the default 
profile
+      // to allow option of them being scheduled off of or not.
+      // We don't currently have a way to specify just YARN custom resources 
via the
+      // ResourceProfile so we send along all custom resources defined.

Review comment:
       I was a bit unclear what the comment meant.
   Are we saying that only gpu/fpga are allowed to be configured via default 
profile, and no other custom resource ?
   If I am understanding this correctly, non-default profiles can specify this 
? Why are we making this restriction ?
   To put it differently, if an application has only default profile and wants 
to use the 'other' resources - it has to set a resource profile, and use that 
for entire app to work around this ? Or am I misunderstanding something here ?

##########
File path: docs/running-on-yarn.md
##########
@@ -644,6 +644,7 @@ YARN does not tell Spark the addresses of the resources 
allocated to each contai
 # Stage Level Scheduling Overview
 
 Stage level scheduling is supported on YARN when dynamic allocation is 
enabled. One thing to note that is YARN specific is that each ResourceProfile 
requires a different container priority on YARN. The mapping is simply the 
ResourceProfile id becomes the priority, on YARN lower numbers are higher 
priority. This means that profiles created earlier will have a higher priority 
in YARN. Normally this won't matter as Spark finishes one stage before starting 
another one, the only case this might have an affect is in a job server type 
scenario, so its something to keep in mind.
+Note there is a difference in the way custom resources are handled between the 
base default profile and custom ResourceProfiles. To allow for the user to 
request YARN containers with extra resources without Spark scheduling on them, 
the user can specify resources via the 
<code>spark.yarn.executor.resource.</code> config. Those configs are only used 
in the base default profile though and do not get propogated into any other 
custom ResourceProfiles. This is because there would be no way to remove them 
if you wanted a stage to not have them. This results in your default profile 
getting custom resources defined in <code>spark.yarn.executor.resource.</code> 
plus spark defined resources of GPU or FPGA. Spark converts GPU and FPGA 
resources into the YARN built in types <code>yarn.io/gpu</code>) and 
<code>yarn.io/fpga</code>, but does not know the mapping of any other 
resources. Any other Spark custom resources are not propogated to YARN for the 
default profile. So if you want Spark to sche
 dule based off a custom resource and have it requested from YARN, you must 
specify it in both YARN (<code>spark.yarn.{driver/executor}.resource.</code>) 
and Spark (<code>spark.{driver/executor}.resource.</code>) configs. Leave the 
Spark config off if you only want YARN containers with the extra resources but 
Spark not to schedule using them. Now for custom ResourceProfiles, it doesn't 
currently have a way to only specify YARN resources without Spark scheduling 
off of them. This means for custom ResourceProfiles we propogate all the 
resources defined in the ResourceProfile to YARN. We still convert GPU and FPGA 
to the YARN build in types as well. This requires that the name of any custom 
resources you specify match what they are defined as in YARN.
 

Review comment:
       I wish yarn supported allocation to multiple queues (so that DL 
workloads could use a queue with gpu's, and 'regular' queue for non-DL stages 
within the same application) - and not tie it at the application level.
   
   Node labels could be used for this - iirc we dont use that for resource 
profile, but only at executor/application level, right ?
   Are we planning to support specifying this per resource profile later ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to