tgravescs commented on a change in pull request #30204:
URL: https://github.com/apache/spark/pull/30204#discussion_r518891725
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
##########
@@ -50,25 +53,42 @@ private[spark] class BasicExecutorFeatureStep(
kubernetesConf.get(DRIVER_HOST_ADDRESS),
kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
- private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
- private val executorMemoryString = kubernetesConf.get(
- EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
- private val memoryOverheadMiB = kubernetesConf
+ private var executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+
+ private var memoryOverheadMiB = kubernetesConf
.get(EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
- private val executorMemoryWithOverhead = executorMemoryMiB +
memoryOverheadMiB
- private val executorMemoryTotal =
- if (kubernetesConf.get(APP_RESOURCE_TYPE) ==
Some(APP_RESOURCE_TYPE_PYTHON)) {
- executorMemoryWithOverhead +
- kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
- } else {
- executorMemoryWithOverhead
+
+ private var executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
+
+ private var pysparkMemoryMiB =
+
kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0).toLong
+
+ private var memoryOffHeapMiB =
Utils.executorOffHeapMemorySizeAsMb(kubernetesConf.sparkConf)
+
+ private val customResources = new mutable.HashSet[ExecutorResourceRequest]
+ resourceProfile.executorResources.foreach { case (resource, execReq) =>
+ resource match {
+ case ResourceProfile.MEMORY =>
+ executorMemoryMiB = execReq.amount
+ case ResourceProfile.OVERHEAD_MEM =>
+ memoryOverheadMiB = execReq.amount
+ case ResourceProfile.PYSPARK_MEM =>
+ pysparkMemoryMiB = execReq.amount
+ case ResourceProfile.OFFHEAP_MEM =>
+ memoryOffHeapMiB = execReq.amount.toInt
+ case ResourceProfile.CORES =>
+ executorCores = execReq.amount.toInt
+ case rName =>
+ customResources += execReq
}
+ }
Review comment:
ok there is a fair bit of code that can be combined here.
I also discovered an inconsistency in other custom resources between yarn
and k8s because of the way templates work, I'll work on making that consistent.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]