tgravescs commented on code in PR #45240:
URL: https://github.com/apache/spark/pull/45240#discussion_r1511443512
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -117,6 +117,14 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional
+ private[spark] val DRIVER_MIN_MEMORY_OVERHEAD =
ConfigBuilder("spark.driver.minMemoryOverhead")
+ .doc("The minimum amount of non-heap memory to be allocated per driver in
cluster mode, " +
+ "in MiB unless otherwise specified. Overrides the default value of
384Mib." +
+ " This value is ignored if spark.driver.memoryOverhead is set directly.")
+ .version("3.5.2")
Review Comment:
this should be 4.0.0. This is a new feature so generally wouldn't be back
ported, at least without discussion.
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +775,40 @@ class YarnAllocatorSuite extends SparkFunSuite
assert(memory == (executorMemory * 1.4).toLong)
} finally {
sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+ sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+ }
+ }
+
+ test("SPARK-47208: User can override the minimum memory overhead of the
executor") {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ try {
+ sparkConf
+ .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
+ val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+ val memory = defaultResource.getMemorySize
+ assert(memory == (executorMemory + 500))
+ } finally {
+ sparkConf
+ .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+ }
+ }
+
+ test("SPARK-47208: Explicit overhead takes precedence over minimum
overhead") {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ try {
+ sparkConf
+ .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+ .set(EXECUTOR_MEMORY_OVERHEAD, 100L)
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
+ val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+ val memory = defaultResource.getMemorySize
+ assert(memory == (executorMemory + 100))
+ } finally {
+ sparkConf
+ .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
Review Comment:
sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite
assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE),
mockFsLookup).size === 2)
}
+ Seq(
+ "client",
+ "cluster"
+ ).foreach { case (deployMode) =>
+ test(s"SPARK-47208: minimum memory overhead is correctly set in
($deployMode mode)") {
+ val sparkConf = new SparkConf()
+ .set("spark.app.name", "foo-test-app")
+ .set(SUBMIT_DEPLOY_MODE, deployMode)
+ .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+ val args = new ClientArguments(Array())
+
+ val appContext =
Records.newRecord(classOf[ApplicationSubmissionContext])
+ val getNewApplicationResponse =
Records.newRecord(classOf[GetNewApplicationResponse])
+ val containerLaunchContext =
Records.newRecord(classOf[ContainerLaunchContext])
+
+ val client = new Client(args, sparkConf, null)
+ client.createApplicationSubmissionContext(
+ new YarnClientApplication(getNewApplicationResponse, appContext),
+ containerLaunchContext)
+
+ appContext.getApplicationName should be("foo-test-app")
+ // flag should only work for cluster mode
+ if (deployMode=="cluster") {
+ // 1Gb driver default + 500 overridden minimum default overhead
+ appContext.getResource should be(Resource.newInstance(1524L, 1))
+ } else {
+ // 512 driver default (non-cluster) + 384 overhead default
+ // that can't be changed in non cluster mode.
Review Comment:
nit extra space before "that"
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -128,6 +128,9 @@ class YarnAllocatorSuite extends SparkFunSuite
.set(EXECUTOR_CORES, 5)
.set(EXECUTOR_MEMORY, 2048L)
+ // scalastyle:off println
+ sparkConfClone.getAll.foreach(println)
Review Comment:
assume this was debugging? remove
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -358,6 +366,15 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional
+ private[spark] val EXECUTOR_MIN_MEMORY_OVERHEAD =
+ ConfigBuilder("spark.executor.minMemoryOverhead")
+ .doc("The minimum amount of non-heap memory to be allocated per executor "
+
+ "in MiB unless otherwise specified. Overrides the default value of
384Mib." +
+ " This value is ignored if spark.executor.memoryOverhead is set
directly.")
+ .version("3.5.2")
Review Comment:
4.0.0
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -75,7 +75,7 @@ private[spark] class Client(
private val yarnClient = YarnClient.createYarnClient
private val hadoopConf = new
YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
- private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
+ private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) ==
"cluster"
Review Comment:
remove extra spaces
##########
docs/configuration.md:
##########
@@ -202,6 +202,15 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
+<tr>
+ <td><code>spark.driver.minMemoryOverhead</code></td>
+ <td>None</td>
+ <td>
+ The minimum amount of non-heap memory to be allocated per driver process
in cluster mode, in MiB unless otherwise specified, if
<code>spark.driver.memoryOverhead</code> is not defined.
+ This option is currently supported on YARN and Kubernetes.
+ </td>
+ <td>3.5.2</td>
Review Comment:
oops, answered this in a different comment, should be 4.0.0
##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -489,10 +489,13 @@ object ResourceProfile extends Logging {
private[spark] def calculateOverHeadMemory(
overHeadMemFromConf: Option[Long],
+ minimumOverHeadMemoryFromConf: Option[Long],
executorMemoryMiB: Long,
overheadFactor: Double): Long = {
+ val minMemoryOverhead =
+
minimumOverHeadMemoryFromConf.getOrElse(ResourceProfile.MEMORY_OVERHEAD_MIN_MIB);
overHeadMemFromConf.getOrElse(math.max((overheadFactor *
executorMemoryMiB).toInt,
- ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
+ minMemoryOverhead))
Review Comment:
I think it would be cleaner just to have the defaults of the configs 384.
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -96,12 +96,21 @@ private[spark] class Client(
} else {
sparkConf.get(AM_MEMORY).toInt
}
+
+ private val driverMinimumMemoryOverhead =
+ if (isClusterMode && sparkConf.contains(DRIVER_MIN_MEMORY_OVERHEAD)) {
+ sparkConf.get(DRIVER_MIN_MEMORY_OVERHEAD).get
+ } else {
+ ResourceProfile.MEMORY_OVERHEAD_MIN_MIB
+ }
+
private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else
AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((amMemoryOverheadFactor * amMemory).toLong,
- ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
+ driverMinimumMemoryOverhead)).toInt
Review Comment:
driver and AM aren't necessarily the same process. In cluster mode it is
but in client mode it isn't (see spark.yarn.am.memoryOverheah) -
https://spark.apache.org/docs/latest/running-on-yarn.html.
Unless you have a use case for the app master doing native stuff, which I
don't see, I think this should stay the same (384) or if you have a use case
for it we should have a separate config for it since its a YARN specific thing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]