tgravescs commented on a change in pull request #30204:
URL: https://github.com/apache/spark/pull/30204#discussion_r518145975



##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
##########
@@ -50,25 +53,42 @@ private[spark] class BasicExecutorFeatureStep(
     kubernetesConf.get(DRIVER_HOST_ADDRESS),
     kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
     CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
-  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
-  private val executorMemoryString = kubernetesConf.get(
-    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
 
-  private val memoryOverheadMiB = kubernetesConf
+  private var executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+
+  private var memoryOverheadMiB = kubernetesConf
     .get(EXECUTOR_MEMORY_OVERHEAD)
     .getOrElse(math.max(
       (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
       MEMORY_OVERHEAD_MIN_MIB))
-  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
-  private val executorMemoryTotal =
-    if (kubernetesConf.get(APP_RESOURCE_TYPE) == 
Some(APP_RESOURCE_TYPE_PYTHON)) {
-      executorMemoryWithOverhead +
-        kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
-    } else {
-      executorMemoryWithOverhead
+
+  private var executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
+
+  private var pysparkMemoryMiB =
+    
kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0).toLong
+
+  private var memoryOffHeapMiB = 
Utils.executorOffHeapMemorySizeAsMb(kubernetesConf.sparkConf)
+
+  private val customResources = new mutable.HashSet[ExecutorResourceRequest]
+  resourceProfile.executorResources.foreach { case (resource, execReq) =>
+    resource match {
+      case ResourceProfile.MEMORY =>
+        executorMemoryMiB = execReq.amount
+      case ResourceProfile.OVERHEAD_MEM =>
+        memoryOverheadMiB = execReq.amount
+      case ResourceProfile.PYSPARK_MEM =>
+        pysparkMemoryMiB = execReq.amount
+      case ResourceProfile.OFFHEAP_MEM =>
+        memoryOffHeapMiB = execReq.amount.toInt
+      case ResourceProfile.CORES =>
+        executorCores = execReq.amount.toInt
+      case rName =>
+        customResources += execReq
     }
+  }

Review comment:
       the gpu/fpga stuff is different in yarn and k8s because YARN only has 
knowledge of those 2 resources, if you do other ones you have to do more 
configuration on the yarn side.  We do handle gpus/fpgas now in this code see 
the customResources section here. That gets used later on in 
buildExecutorResourcesQuantities.  I'll look at it to see if I can share the 
code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to