tgravescs commented on code in PR #45240:
URL: https://github.com/apache/spark/pull/45240#discussion_r1523328549


##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite
     assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE), 
mockFsLookup).size === 2)
   }
 
+  Seq(
+      "client",
+      "cluster"
+    ).foreach { case (deployMode) =>
+      test(s"SPARK-47208: minimum memory overhead is correctly set in 
($deployMode mode)") {
+        val sparkConf = new SparkConf()
+          .set("spark.app.name", "foo-test-app")
+          .set(SUBMIT_DEPLOY_MODE, deployMode)
+          .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+        val args = new ClientArguments(Array())
+
+        val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+        val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+        val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+        val client = new Client(args, sparkConf, null)
+        client.createApplicationSubmissionContext(
+          new YarnClientApplication(getNewApplicationResponse, appContext),
+          containerLaunchContext)
+
+        appContext.getApplicationName should be("foo-test-app")
+        // flag should only work for cluster mode
+        if (deployMode=="cluster") {

Review Comment:
   nit spaces around " == "



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +773,42 @@ class YarnAllocatorSuite extends SparkFunSuite
       assert(memory == (executorMemory * 1.4).toLong)
     } finally {
       sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+      sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: User can override the minimum memory overhead of the 
executor") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 500))
+    } finally {
+      sparkConf
+        .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: Explicit overhead takes precedence over minimum 
overhead") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+        .set(EXECUTOR_MEMORY_OVERHEAD, 100L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 100))
+    } finally {
+      sparkConf
+        .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+      sparkConf
+        .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)

Review Comment:
   you are removing EXECUTOR_MIN_MEMORY_OVERHEAD twice, one of these should be 
EXECUTOR_MEMORY_OVERHEAD right?



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite
     assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE), 
mockFsLookup).size === 2)
   }
 
+  Seq(
+      "client",
+      "cluster"
+    ).foreach { case (deployMode) =>
+      test(s"SPARK-47208: minimum memory overhead is correctly set in 
($deployMode mode)") {
+        val sparkConf = new SparkConf()
+          .set("spark.app.name", "foo-test-app")
+          .set(SUBMIT_DEPLOY_MODE, deployMode)
+          .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+        val args = new ClientArguments(Array())
+
+        val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+        val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+        val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+        val client = new Client(args, sparkConf, null)
+        client.createApplicationSubmissionContext(
+          new YarnClientApplication(getNewApplicationResponse, appContext),
+          containerLaunchContext)
+
+        appContext.getApplicationName should be("foo-test-app")
+        // flag should only work for cluster mode
+        if (deployMode=="cluster") {
+          // 1Gb driver default + 500 overridden minimum default overhead
+          appContext.getResource should be(Resource.newInstance(1524L, 1))

Review Comment:
   nit space after "should be "  for readability



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +773,42 @@ class YarnAllocatorSuite extends SparkFunSuite
       assert(memory == (executorMemory * 1.4).toLong)
     } finally {
       sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+      sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: User can override the minimum memory overhead of the 
executor") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 500))
+    } finally {
+      sparkConf
+        .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: Explicit overhead takes precedence over minimum 
overhead") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+        .set(EXECUTOR_MEMORY_OVERHEAD, 100L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 100))
+    } finally {
+      sparkConf

Review Comment:
   nit the .remove should be on same line as sparkConf as its not to long
   
   



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +773,42 @@ class YarnAllocatorSuite extends SparkFunSuite
       assert(memory == (executorMemory * 1.4).toLong)
     } finally {
       sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+      sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: User can override the minimum memory overhead of the 
executor") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 500))
+    } finally {
+      sparkConf

Review Comment:
   nit put the .removeConf on the same line as sparkConf



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -96,12 +96,21 @@ private[spark] class Client(
   } else {
     sparkConf.get(AM_MEMORY).toInt
   }
+
+  private val driverMinimumMemoryOverhead =
+    if (isClusterMode && sparkConf.contains(DRIVER_MIN_MEMORY_OVERHEAD)) {
+      sparkConf.get(DRIVER_MIN_MEMORY_OVERHEAD).get
+    } else {
+      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB
+    }
+
   private val amMemoryOverhead = {
     val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else 
AM_MEMORY_OVERHEAD
     sparkConf.get(amMemoryOverheadEntry).getOrElse(
       math.max((amMemoryOverheadFactor * amMemory).toLong,
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
+        driverMinimumMemoryOverhead)).toInt

Review Comment:
   ah yes that works



##########
docs/configuration.md:
##########
@@ -202,6 +202,15 @@ of the most common options to set are:
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.driver.minMemoryOverhead</code></td>
+  <td>384Mib</td>

Review Comment:
    I think I would put it at "384m" to be more consistent with other docs and 
be format user would specify if changing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to