mridulm commented on a change in pull request #30204:
URL: https://github.com/apache/spark/pull/30204#discussion_r521811763
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -293,20 +305,32 @@ object ResourceProfile extends Logging {
private def getDefaultExecutorResources(conf: SparkConf): Map[String,
ExecutorResourceRequest] = {
val ereqs = new ExecutorResourceRequests()
- ereqs.cores(conf.get(EXECUTOR_CORES))
- ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
- conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem =>
ereqs.memoryOverhead(mem.toString))
- conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem =>
ereqs.pysparkMemory(mem.toString))
- if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
- // Explicitly add suffix b as default unit of offHeapMemory is Mib
- ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b")
- }
+ val cores = conf.get(EXECUTOR_CORES)
+ ereqs.cores(cores)
+ val memory = conf.get(EXECUTOR_MEMORY)
+ ereqs.memory(memory.toString)
+ val overheadMem = conf.get(EXECUTOR_MEMORY_OVERHEAD)
+ overheadMem.map(mem => ereqs.memoryOverhead(mem.toString))
+ val pysparkMem = conf.get(PYSPARK_EXECUTOR_MEMORY)
+ pysparkMem.map(mem => ereqs.pysparkMemory(mem.toString))
+ val offheapMem = Utils.executorOffHeapMemorySizeAsMb(conf)
+ ereqs.offHeapMemory(offheapMem.toString)
val execReq = ResourceUtils.parseAllResourceRequests(conf,
SPARK_EXECUTOR_PREFIX)
+ val customResources = new mutable.HashMap[String, ExecutorResourceRequest]
execReq.foreach { req =>
val name = req.id.resourceName
+ customResources(name) =
+ new ExecutorResourceRequest(
+ req.id.resourceName,
+ req.amount,
+ req.discoveryScript.orElse(""),
+ req.vendor.orElse(""))
ereqs.resource(name, req.amount, req.discoveryScript.orElse(""),
req.vendor.orElse(""))
}
Review comment:
Instead of creating `customResources` above, will this not work ?
```
val customResourceNames = execReq.map(_.id.resourceName).toSet
val customResources = ereqs.requests.filter(v =>
customResourceNames.contains(v._1))
```
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -342,6 +367,110 @@ object ResourceProfile extends Logging {
rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
}
+ /**
+ * Get offHeap memory size from [[ExecutorResourceRequest]]
+ * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+ */
+ private[spark] def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+ execRequest: ExecutorResourceRequest): Long = {
+ Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
+ }
+
+ private[spark] case class ExecutorResourcesOrDefaults(
+ cores: Int,
+ executorMemoryMiB: Long,
+ memoryOffHeapMiB: Long,
+ pysparkMemoryMiB: Long,
+ memoryOverheadMiB: Long,
+ totalMemMiB: Long,
+ customResources: Map[String, ExecutorResourceRequest])
+
+ private[spark] case class DefaultProfileExecutorResources(
+ cores: Int,
+ executorMemoryMiB: Long,
+ memoryOffHeapMiB: Long,
+ pysparkMemoryMiB: Option[Long],
+ memoryOverheadMiB: Option[Long],
+ customResources: Map[String, ExecutorResourceRequest])
+
+ private[spark] def calculateOverHeadMemory(
+ overHeadMemFromConf: Option[Long],
+ executorMemoryMiB: Long,
+ overheadFactor: Double): Long = {
+ overHeadMemFromConf.getOrElse(math.max((overheadFactor *
executorMemoryMiB).toInt,
+ ResourceProfile.MEMORY_OVERHEAD_MIN))
+ }
+
+ /**
+ * Gets the full list of resources to allow a cluster manager to request the
appropriate
+ * container. If the resource profile is not the default one we either get
the resources
+ * specified in the profile or fall back to the default profile resource
size for everything
+ * except for custom resources.
+ */
+ private[spark] def getResourcesForClusterManager(
+ rpId: Int,
+ execResources: Map[String, ExecutorResourceRequest],
+ overheadFactor: Double,
+ conf: SparkConf,
+ isPythonApp: Boolean,
+ resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults = {
+ val defaultResources = getDefaultProfileExecutorResources(conf)
+ if (rpId == DEFAULT_RESOURCE_PROFILE_ID) {
+ val memoryOverheadMiB = ResourceProfile.calculateOverHeadMemory(
+ defaultResources.memoryOverheadMiB,
defaultResources.executorMemoryMiB, overheadFactor)
+ val pysparkMemToUseMiB = if (isPythonApp) {
+ defaultResources.pysparkMemoryMiB.getOrElse(0L)
+ } else {
+ 0L
+ }
+ val execMem = defaultResources.executorMemoryMiB
+ val offHeap = defaultResources.memoryOffHeapMiB
+ val totalMemMiB = execMem + offHeap + pysparkMemToUseMiB +
memoryOverheadMiB
+ val customResources = defaultResources.customResources.map { case
(rName, execReq) =>
+ val nameToUse = resourceMappings.get(rName).getOrElse(rName)
+ (nameToUse, execReq)
+ }
Review comment:
Can we:
1. Move all the `val` above to above the `if (rpId ==
DEFAULT_RESOURCE_PROFILE_ID)` and make them `var`.
2. Have only `totalMemMiB `, `customResources` and
`ExecutorResourcesOrDefaults` inside `if`.
3. Remove the same `var` initialization in `else` block (since they are
covered in the change from (1) above).
I am trying to see if we can make this more concise ...
Thoughts ?
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -256,13 +257,16 @@ object ResourceProfile extends Logging {
val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0
+ private[spark] val MEMORY_OVERHEAD_MIN = 384L
Review comment:
nit: `MEMORY_OVERHEAD_MIN` -> `MEMORY_OVERHEAD_MIN_MIB` ?
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -342,6 +367,110 @@ object ResourceProfile extends Logging {
rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
}
+ /**
+ * Get offHeap memory size from [[ExecutorResourceRequest]]
+ * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+ */
+ private[spark] def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+ execRequest: ExecutorResourceRequest): Long = {
+ Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
+ }
+
+ private[spark] case class ExecutorResourcesOrDefaults(
+ cores: Int,
+ executorMemoryMiB: Long,
+ memoryOffHeapMiB: Long,
+ pysparkMemoryMiB: Long,
+ memoryOverheadMiB: Long,
+ totalMemMiB: Long,
+ customResources: Map[String, ExecutorResourceRequest])
+
+ private[spark] case class DefaultProfileExecutorResources(
Review comment:
Can we replace `DefaultProfileExecutorResources` with
`ExecutorResourcesOrDefaults` ?
I am not sure what value it was adding which use of
`ExecutorResourcesOrDefaults` cant cover.
##########
File path:
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -302,48 +273,38 @@ private[yarn] class YarnAllocator(
}
// if a ResourceProfile hasn't been seen yet, create the corresponding YARN
Resource for it
- private def createYarnResourceForResourceProfile(
- resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit =
synchronized {
- resourceProfileToTotalExecs.foreach { case (rp, num) =>
- if (!rpIdToYarnResource.contains(rp.id)) {
- // Start with the application or default settings
- var heapMem = executorMemory.toLong
- var offHeapMem = executorOffHeapMemory.toLong
- var overheadMem = memoryOverhead.toLong
- var pysparkMem = pysparkWorkerMemory.toLong
- var cores = defaultExecutorCores
- val customResources = new mutable.HashMap[String, String]
- // track the resource profile if not already there
- getOrUpdateRunningExecutorForRPId(rp.id)
- logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
- val execResources = rp.executorResources
- execResources.foreach { case (r, execReq) =>
- r match {
- case ResourceProfile.MEMORY =>
- heapMem = execReq.amount
- case ResourceProfile.OVERHEAD_MEM =>
- overheadMem = execReq.amount
- case ResourceProfile.PYSPARK_MEM =>
- pysparkMem = execReq.amount
- case ResourceProfile.OFFHEAP_MEM =>
- offHeapMem =
YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq)
- case ResourceProfile.CORES =>
- cores = execReq.amount.toInt
- case "gpu" =>
- customResources(YARN_GPU_RESOURCE_CONFIG) =
execReq.amount.toString
- case "fpga" =>
- customResources(YARN_FPGA_RESOURCE_CONFIG) =
execReq.amount.toString
- case rName =>
- customResources(rName) = execReq.amount.toString
- }
+ private def createYarnResourceForResourceProfile(rp: ResourceProfile): Unit
= synchronized {
+ if (!rpIdToYarnResource.contains(rp.id)) {
+ // track the resource profile if not already there
+ getOrUpdateRunningExecutorForRPId(rp.id)
+ logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
+ val resourcesWithDefaults =
+ ResourceProfile.getResourcesForClusterManager(rp.id,
rp.executorResources,
+ MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp,
+ ResourceRequestHelper.resourceNameMapping)
+ val customSparkResources =
+ resourcesWithDefaults.customResources.map { case (name, execReq) =>
+ (name, execReq.amount.toString)
}
- val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt
- val resource = Resource.newInstance(totalMem, cores)
- ResourceRequestHelper.setResourceRequests(customResources.toMap,
resource)
- logDebug(s"Created resource capability: $resource")
- rpIdToYarnResource.putIfAbsent(rp.id, resource)
- rpIdToResourceProfile(rp.id) = rp
+ // YARN specified resources are only supported with the default profile
and we only
+ // pick up Spark custom resources for GPU and FPGA with the default
profile
+ // to allow option of them being scheduled off of or not.
+ // We don't currently have a way to specify just YARN custom resources
via the
+ // ResourceProfile so we send along all custom resources defined.
Review comment:
I was a bit unclear what the comment meant.
Are we saying that only gpu/fpga are allowed to be configured via default
profile, and no other custom resource ?
If I am understanding this correctly, non-default profiles can specify this
? Why are we making this restriction ?
To put it differently, if an application has only default profile and wants
to use the 'other' resources - it has to set a resource profile, and use that
for entire app to work around this ? Or am I misunderstanding something here ?
##########
File path: docs/running-on-yarn.md
##########
@@ -644,6 +644,7 @@ YARN does not tell Spark the addresses of the resources
allocated to each contai
# Stage Level Scheduling Overview
Stage level scheduling is supported on YARN when dynamic allocation is
enabled. One thing to note that is YARN specific is that each ResourceProfile
requires a different container priority on YARN. The mapping is simply the
ResourceProfile id becomes the priority, on YARN lower numbers are higher
priority. This means that profiles created earlier will have a higher priority
in YARN. Normally this won't matter as Spark finishes one stage before starting
another one, the only case this might have an affect is in a job server type
scenario, so its something to keep in mind.
+Note there is a difference in the way custom resources are handled between the
base default profile and custom ResourceProfiles. To allow for the user to
request YARN containers with extra resources without Spark scheduling on them,
the user can specify resources via the
<code>spark.yarn.executor.resource.</code> config. Those configs are only used
in the base default profile though and do not get propogated into any other
custom ResourceProfiles. This is because there would be no way to remove them
if you wanted a stage to not have them. This results in your default profile
getting custom resources defined in <code>spark.yarn.executor.resource.</code>
plus spark defined resources of GPU or FPGA. Spark converts GPU and FPGA
resources into the YARN built in types <code>yarn.io/gpu</code>) and
<code>yarn.io/fpga</code>, but does not know the mapping of any other
resources. Any other Spark custom resources are not propogated to YARN for the
default profile. So if you want Spark to sche
dule based off a custom resource and have it requested from YARN, you must
specify it in both YARN (<code>spark.yarn.{driver/executor}.resource.</code>)
and Spark (<code>spark.{driver/executor}.resource.</code>) configs. Leave the
Spark config off if you only want YARN containers with the extra resources but
Spark not to schedule using them. Now for custom ResourceProfiles, it doesn't
currently have a way to only specify YARN resources without Spark scheduling
off of them. This means for custom ResourceProfiles we propogate all the
resources defined in the ResourceProfile to YARN. We still convert GPU and FPGA
to the YARN build in types as well. This requires that the name of any custom
resources you specify match what they are defined as in YARN.
Review comment:
I wish yarn supported allocation to multiple queues (so that DL
workloads could use a queue with gpu's, and 'regular' queue for non-DL stages
within the same application) - and not tie it at the application level.
Node labels could be used for this - iirc we dont use that for resource
profile, but only at executor/application level, right ?
Are we planning to support specifying this per resource profile later ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]