tgravescs commented on a change in pull request #30204:
URL: https://github.com/apache/spark/pull/30204#discussion_r523095952
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -342,6 +367,110 @@ object ResourceProfile extends Logging {
rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
}
+ /**
+ * Get offHeap memory size from [[ExecutorResourceRequest]]
+ * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+ */
+ private[spark] def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+ execRequest: ExecutorResourceRequest): Long = {
+ Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
+ }
+
+ private[spark] case class ExecutorResourcesOrDefaults(
+ cores: Int,
+ executorMemoryMiB: Long,
+ memoryOffHeapMiB: Long,
+ pysparkMemoryMiB: Long,
+ memoryOverheadMiB: Long,
+ totalMemMiB: Long,
+ customResources: Map[String, ExecutorResourceRequest])
+
+ private[spark] case class DefaultProfileExecutorResources(
+ cores: Int,
+ executorMemoryMiB: Long,
+ memoryOffHeapMiB: Long,
+ pysparkMemoryMiB: Option[Long],
+ memoryOverheadMiB: Option[Long],
+ customResources: Map[String, ExecutorResourceRequest])
+
+ private[spark] def calculateOverHeadMemory(
+ overHeadMemFromConf: Option[Long],
+ executorMemoryMiB: Long,
+ overheadFactor: Double): Long = {
+ overHeadMemFromConf.getOrElse(math.max((overheadFactor *
executorMemoryMiB).toInt,
+ ResourceProfile.MEMORY_OVERHEAD_MIN))
+ }
+
+ /**
+ * Gets the full list of resources to allow a cluster manager to request the
appropriate
+ * container. If the resource profile is not the default one we either get
the resources
+ * specified in the profile or fall back to the default profile resource
size for everything
+ * except for custom resources.
+ */
+ private[spark] def getResourcesForClusterManager(
+ rpId: Int,
+ execResources: Map[String, ExecutorResourceRequest],
+ overheadFactor: Double,
+ conf: SparkConf,
+ isPythonApp: Boolean,
+ resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults = {
+ val defaultResources = getDefaultProfileExecutorResources(conf)
+ if (rpId == DEFAULT_RESOURCE_PROFILE_ID) {
+ val memoryOverheadMiB = ResourceProfile.calculateOverHeadMemory(
+ defaultResources.memoryOverheadMiB,
defaultResources.executorMemoryMiB, overheadFactor)
+ val pysparkMemToUseMiB = if (isPythonApp) {
+ defaultResources.pysparkMemoryMiB.getOrElse(0L)
+ } else {
+ 0L
+ }
+ val execMem = defaultResources.executorMemoryMiB
+ val offHeap = defaultResources.memoryOffHeapMiB
+ val totalMemMiB = execMem + offHeap + pysparkMemToUseMiB +
memoryOverheadMiB
+ val customResources = defaultResources.customResources.map { case
(rName, execReq) =>
+ val nameToUse = resourceMappings.get(rName).getOrElse(rName)
+ (nameToUse, execReq)
+ }
Review comment:
yes we can definitely commonize some of that,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]