tgravescs commented on code in PR #45240:
URL: https://github.com/apache/spark/pull/45240#discussion_r1523328549
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite
assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE),
mockFsLookup).size === 2)
}
+ Seq(
+ "client",
+ "cluster"
+ ).foreach { case (deployMode) =>
+ test(s"SPARK-47208: minimum memory overhead is correctly set in
($deployMode mode)") {
+ val sparkConf = new SparkConf()
+ .set("spark.app.name", "foo-test-app")
+ .set(SUBMIT_DEPLOY_MODE, deployMode)
+ .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+ val args = new ClientArguments(Array())
+
+ val appContext =
Records.newRecord(classOf[ApplicationSubmissionContext])
+ val getNewApplicationResponse =
Records.newRecord(classOf[GetNewApplicationResponse])
+ val containerLaunchContext =
Records.newRecord(classOf[ContainerLaunchContext])
+
+ val client = new Client(args, sparkConf, null)
+ client.createApplicationSubmissionContext(
+ new YarnClientApplication(getNewApplicationResponse, appContext),
+ containerLaunchContext)
+
+ appContext.getApplicationName should be("foo-test-app")
+ // flag should only work for cluster mode
+ if (deployMode=="cluster") {
Review Comment:
nit spaces around " == "
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +773,42 @@ class YarnAllocatorSuite extends SparkFunSuite
assert(memory == (executorMemory * 1.4).toLong)
} finally {
sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+ sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+ }
+ }
+
+ test("SPARK-47208: User can override the minimum memory overhead of the
executor") {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ try {
+ sparkConf
+ .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
+ val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+ val memory = defaultResource.getMemorySize
+ assert(memory == (executorMemory + 500))
+ } finally {
+ sparkConf
+ .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+ }
+ }
+
+ test("SPARK-47208: Explicit overhead takes precedence over minimum
overhead") {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ try {
+ sparkConf
+ .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+ .set(EXECUTOR_MEMORY_OVERHEAD, 100L)
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
+ val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+ val memory = defaultResource.getMemorySize
+ assert(memory == (executorMemory + 100))
+ } finally {
+ sparkConf
+ .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+ sparkConf
+ .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
Review Comment:
you are removing EXECUTOR_MIN_MEMORY_OVERHEAD twice, one of these should be
EXECUTOR_MEMORY_OVERHEAD right?
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite
assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE),
mockFsLookup).size === 2)
}
+ Seq(
+ "client",
+ "cluster"
+ ).foreach { case (deployMode) =>
+ test(s"SPARK-47208: minimum memory overhead is correctly set in
($deployMode mode)") {
+ val sparkConf = new SparkConf()
+ .set("spark.app.name", "foo-test-app")
+ .set(SUBMIT_DEPLOY_MODE, deployMode)
+ .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+ val args = new ClientArguments(Array())
+
+ val appContext =
Records.newRecord(classOf[ApplicationSubmissionContext])
+ val getNewApplicationResponse =
Records.newRecord(classOf[GetNewApplicationResponse])
+ val containerLaunchContext =
Records.newRecord(classOf[ContainerLaunchContext])
+
+ val client = new Client(args, sparkConf, null)
+ client.createApplicationSubmissionContext(
+ new YarnClientApplication(getNewApplicationResponse, appContext),
+ containerLaunchContext)
+
+ appContext.getApplicationName should be("foo-test-app")
+ // flag should only work for cluster mode
+ if (deployMode=="cluster") {
+ // 1Gb driver default + 500 overridden minimum default overhead
+ appContext.getResource should be(Resource.newInstance(1524L, 1))
Review Comment:
nit space after "should be " for readability
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +773,42 @@ class YarnAllocatorSuite extends SparkFunSuite
assert(memory == (executorMemory * 1.4).toLong)
} finally {
sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+ sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+ }
+ }
+
+ test("SPARK-47208: User can override the minimum memory overhead of the
executor") {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ try {
+ sparkConf
+ .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
+ val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+ val memory = defaultResource.getMemorySize
+ assert(memory == (executorMemory + 500))
+ } finally {
+ sparkConf
+ .remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+ }
+ }
+
+ test("SPARK-47208: Explicit overhead takes precedence over minimum
overhead") {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ try {
+ sparkConf
+ .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+ .set(EXECUTOR_MEMORY_OVERHEAD, 100L)
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
+ val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+ val memory = defaultResource.getMemorySize
+ assert(memory == (executorMemory + 100))
+ } finally {
+ sparkConf
Review Comment:
nit the .remove should be on same line as sparkConf as its not to long
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:
##########
@@ -772,6 +773,42 @@ class YarnAllocatorSuite extends SparkFunSuite
assert(memory == (executorMemory * 1.4).toLong)
} finally {
sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+ sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+ }
+ }
+
+ test("SPARK-47208: User can override the minimum memory overhead of the
executor") {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+ try {
+ sparkConf
+ .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_MEMORY.key ->
executorMemory.toString))
+ val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+ val memory = defaultResource.getMemorySize
+ assert(memory == (executorMemory + 500))
+ } finally {
+ sparkConf
Review Comment:
nit put the .removeConf on the same line as sparkConf
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -96,12 +96,21 @@ private[spark] class Client(
} else {
sparkConf.get(AM_MEMORY).toInt
}
+
+ private val driverMinimumMemoryOverhead =
+ if (isClusterMode && sparkConf.contains(DRIVER_MIN_MEMORY_OVERHEAD)) {
+ sparkConf.get(DRIVER_MIN_MEMORY_OVERHEAD).get
+ } else {
+ ResourceProfile.MEMORY_OVERHEAD_MIN_MIB
+ }
+
private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else
AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((amMemoryOverheadFactor * amMemory).toLong,
- ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
+ driverMinimumMemoryOverhead)).toInt
Review Comment:
ah yes that works
##########
docs/configuration.md:
##########
@@ -202,6 +202,15 @@ of the most common options to set are:
</td>
<td>2.3.0</td>
</tr>
+<tr>
+ <td><code>spark.driver.minMemoryOverhead</code></td>
+ <td>384Mib</td>
Review Comment:
I think I would put it at "384m" to be more consistent with other docs and
be format user would specify if changing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]