tgravescs commented on code in PR #45240:
URL: https://github.com/apache/spark/pull/45240#discussion_r1511443512


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -117,6 +117,14 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
+  private[spark] val DRIVER_MIN_MEMORY_OVERHEAD = 
ConfigBuilder("spark.driver.minMemoryOverhead")
+    .doc("The minimum amount of non-heap memory to be allocated per driver in 
cluster mode, " +
+      "in MiB unless otherwise specified. Overrides the default value of 
384Mib." +
+      " This value is ignored if spark.driver.memoryOverhead is set directly.")
+    .version("3.5.2")

Review Comment:
   this should be 4.0.0. This is a new feature so generally wouldn't be back 
ported, at least without discussion.



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +775,40 @@ class YarnAllocatorSuite extends SparkFunSuite
       assert(memory == (executorMemory * 1.4).toLong)
     } finally {
       sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+      sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: User can override the minimum memory overhead of the 
executor") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 500))
+    } finally {
+      sparkConf
+        .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: Explicit overhead takes precedence over minimum 
overhead") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+        .set(EXECUTOR_MEMORY_OVERHEAD, 100L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 100))
+    } finally {
+      sparkConf
+        .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)

Review Comment:
    sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
   



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite
     assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE), 
mockFsLookup).size === 2)
   }
 
+  Seq(
+      "client",
+      "cluster"
+    ).foreach { case (deployMode) =>
+      test(s"SPARK-47208: minimum memory overhead is correctly set in 
($deployMode mode)") {
+        val sparkConf = new SparkConf()
+          .set("spark.app.name", "foo-test-app")
+          .set(SUBMIT_DEPLOY_MODE, deployMode)
+          .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+        val args = new ClientArguments(Array())
+
+        val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+        val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+        val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+        val client = new Client(args, sparkConf, null)
+        client.createApplicationSubmissionContext(
+          new YarnClientApplication(getNewApplicationResponse, appContext),
+          containerLaunchContext)
+
+        appContext.getApplicationName should be("foo-test-app")
+        // flag should only work for cluster mode
+        if (deployMode=="cluster") {
+          // 1Gb driver default + 500 overridden minimum default overhead
+          appContext.getResource should be(Resource.newInstance(1524L, 1))
+        } else {
+          // 512 driver default (non-cluster) + 384 overhead default
+          //  that can't be changed in non cluster mode.

Review Comment:
   nit extra space before "that"



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -128,6 +128,9 @@ class YarnAllocatorSuite extends SparkFunSuite
       .set(EXECUTOR_CORES, 5)
       .set(EXECUTOR_MEMORY, 2048L)
 
+    // scalastyle:off println
+    sparkConfClone.getAll.foreach(println)

Review Comment:
   assume this was debugging? remove



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -358,6 +366,15 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
+  private[spark] val EXECUTOR_MIN_MEMORY_OVERHEAD =
+    ConfigBuilder("spark.executor.minMemoryOverhead")
+    .doc("The minimum amount of non-heap memory to be allocated per executor " 
+
+      "in MiB unless otherwise specified. Overrides the default value of 
384Mib." +
+      " This value is ignored if spark.executor.memoryOverhead is set 
directly.")
+    .version("3.5.2")

Review Comment:
   4.0.0



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -75,7 +75,7 @@ private[spark] class Client(
   private val yarnClient = YarnClient.createYarnClient
   private val hadoopConf = new 
YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
 
-  private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
+      private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == 
"cluster"

Review Comment:
   remove extra spaces



##########
docs/configuration.md:
##########
@@ -202,6 +202,15 @@ of the most common options to set are:
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.driver.minMemoryOverhead</code></td>
+  <td>None</td>
+  <td>
+    The minimum amount of non-heap memory to be allocated per driver process 
in cluster mode, in MiB unless otherwise specified, if 
<code>spark.driver.memoryOverhead</code> is not defined.
+    This option is currently supported on YARN and Kubernetes.
+  </td>
+  <td>3.5.2</td>

Review Comment:
   oops, answered this in a different comment, should be 4.0.0



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -489,10 +489,13 @@ object ResourceProfile extends Logging {
 
   private[spark] def calculateOverHeadMemory(
       overHeadMemFromConf: Option[Long],
+      minimumOverHeadMemoryFromConf: Option[Long],
       executorMemoryMiB: Long,
       overheadFactor: Double): Long = {
+    val minMemoryOverhead =
+      
minimumOverHeadMemoryFromConf.getOrElse(ResourceProfile.MEMORY_OVERHEAD_MIN_MIB);
     overHeadMemFromConf.getOrElse(math.max((overheadFactor * 
executorMemoryMiB).toInt,
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
+      minMemoryOverhead))

Review Comment:
   I think it would be cleaner just to have the defaults of the configs 384.



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -96,12 +96,21 @@ private[spark] class Client(
   } else {
     sparkConf.get(AM_MEMORY).toInt
   }
+
+  private val driverMinimumMemoryOverhead =
+    if (isClusterMode && sparkConf.contains(DRIVER_MIN_MEMORY_OVERHEAD)) {
+      sparkConf.get(DRIVER_MIN_MEMORY_OVERHEAD).get
+    } else {
+      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB
+    }
+
   private val amMemoryOverhead = {
     val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else 
AM_MEMORY_OVERHEAD
     sparkConf.get(amMemoryOverheadEntry).getOrElse(
       math.max((amMemoryOverheadFactor * amMemory).toLong,
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
+        driverMinimumMemoryOverhead)).toInt

Review Comment:
   driver and AM aren't necessarily the same process.  In cluster mode it is 
but in client mode it isn't (see spark.yarn.am.memoryOverheah) - 
https://spark.apache.org/docs/latest/running-on-yarn.html.
   Unless you have a use case for the app master doing native stuff, which I 
don't see, I think this should stay the same (384) or if you have a use case 
for it we should have a separate config for it since its a YARN specific thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to