LuciferYang commented on a change in pull request #25309:
[SPARK-28577][YARN]Resource capability requested for each executor add
offHeapMemorySize
URL: https://github.com/apache/spark/pull/25309#discussion_r310190148
##########
File path:
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
##########
@@ -184,4 +184,29 @@ object YarnSparkHadoopUtil {
ConverterUtils.toContainerId(containerIdString)
}
+ /**
+ * If MEMORY_OFFHEAP_ENABLED is true, we should ensure
executorOverheadMemory requested value
+ * is not less than MEMORY_OFFHEAP_SIZE, otherwise the memory resource
requested for executor
+ * may be not enough.
+ */
+ def executorMemoryOverheadRequested(sparkConf: SparkConf): Int = {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+ val overhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt,
MEMORY_OVERHEAD_MIN)).toInt
+ val offHeap = if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
+ val size =
+ sparkConf.getSizeAsMb(MEMORY_OFFHEAP_SIZE.key,
MEMORY_OFFHEAP_SIZE.defaultValueString)
+ require(size > 0,
+ s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when
${MEMORY_OFFHEAP_ENABLED.key} == true")
+ if (size > overhead) {
+ logWarning(s"The value of ${MEMORY_OFFHEAP_SIZE.key}(${size}MB) will
be used as " +
+ s"executorMemoryOverhead to request resource to ensure that Executor
has enough memory " +
+ s"to use. It is recommended that the configuration value of " +
+ s"${EXECUTOR_MEMORY_OVERHEAD.key} should be no less than
${MEMORY_OFFHEAP_SIZE.key} " +
+ s"when ${MEMORY_OFFHEAP_ENABLED.key} is true.")
+ }
+ size
+ } else 0
+ math.max(overhead, offHeap).toInt
Review comment:
@tgravescs In our configuration doc I found that
`spark.executor.memoryOverhead` described as `Amount of non-heap memory to be
allocated per executor process in cluster mode` and `Non-heap memory includes
off-heap memory (when spark.memory.offHeap.enabled=true) and memory used by
other executor processes (e.g. python process that goes with a PySpark
executor) and memory used by other non-executor processes running in the same
container.`.
So memoryOverhead = offHeapMemory + pysparkWorkerMemory + otherMemory In the
past, now pysparkWorkerMemory has been configured independently, and config
offHeapMemory independently is also for consistent.
But there will be some changes in user habits compared with the old version
spark.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]