LuciferYang commented on a change in pull request #25309: 
[SPARK-28577][YARN]Resource capability requested for each executor add 
offHeapMemorySize 
URL: https://github.com/apache/spark/pull/25309#discussion_r310190148
 
 

 ##########
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ##########
 @@ -184,4 +184,29 @@ object YarnSparkHadoopUtil {
     ConverterUtils.toContainerId(containerIdString)
   }
 
+  /**
+   * If MEMORY_OFFHEAP_ENABLED is true, we should ensure 
executorOverheadMemory requested value
+   * is not less than MEMORY_OFFHEAP_SIZE, otherwise the memory resource 
requested for executor
+   * may be not enough.
+   */
+  def executorMemoryOverheadRequested(sparkConf: SparkConf): Int = {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+    val overhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+      math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, 
MEMORY_OVERHEAD_MIN)).toInt
+    val offHeap = if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
+      val size =
+        sparkConf.getSizeAsMb(MEMORY_OFFHEAP_SIZE.key, 
MEMORY_OFFHEAP_SIZE.defaultValueString)
+      require(size > 0,
+        s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when 
${MEMORY_OFFHEAP_ENABLED.key} == true")
+      if (size > overhead) {
+        logWarning(s"The value of ${MEMORY_OFFHEAP_SIZE.key}(${size}MB) will 
be used as " +
+          s"executorMemoryOverhead to request resource to ensure that Executor 
has enough memory " +
+          s"to use. It is recommended that the configuration value of " +
+          s"${EXECUTOR_MEMORY_OVERHEAD.key} should be no less than 
${MEMORY_OFFHEAP_SIZE.key} " +
+          s"when ${MEMORY_OFFHEAP_ENABLED.key} is true.")
+      }
+      size
+    } else 0
+    math.max(overhead, offHeap).toInt
 
 Review comment:
   @tgravescs In Spark configuration doc I found that 
`spark.executor.memoryOverhead` described as `Amount of non-heap memory to be 
allocated per executor process in cluster mode` and `Non-heap memory includes 
off-heap memory (when spark.memory.offHeap.enabled=true) and memory used by 
other executor processes (e.g. python process that goes with a PySpark 
executor) and memory used by other non-executor processes running in the same 
container.`.
   
   So memoryOverhead = offHeapMemory + pysparkWorkerMemory + otherMemory In the 
past, now pysparkWorkerMemory has been configured independently, and config 
offHeapMemory independently is also for consistent.
   
   But there will be some changes in user habits compared with the old version 
spark.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to