CodingCat commented on code in PR #53190:
URL: https://github.com/apache/spark/pull/53190#discussion_r2554539131


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -559,12 +563,71 @@ object ResourceProfile extends Logging {
     } else {
       0L
     }
-    val totalMemMiB =
-      (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
-    ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
-      pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
+
+    if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) {
+      val totalMemMiB =
+        (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit = 
None,
+        finalCustomResources)
+    } else {
+      val burstyControlFactor = 
conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR)
+      val newMemoryOverheadMiB = (memoryOverheadMiB - math.min(
+        (executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0), 
memoryOverheadMiB))
+        .toLong
+      val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" +
+        s" $newMemoryOverheadMiB MB")
+      updateEventLogAndUI(newMemoryOverheadMiB, conf)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest,
+        totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources)
+    }
+  }
+
+  private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf: 
SparkConf): Unit = {
+    conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB)
+    val sparkContextOption = SparkContext.getActive
+    if (sparkContextOption.isDefined) {
+      val sparkContext = sparkContextOption.get
+      val klass = classOf[ApplicationEnvironmentInfoWrapper]
+      val currentAppEnvironment = sparkContext._statusStore.store.read(klass, 
klass.getName()).info
+      logInfo(s"currentAppEnvironment spark properties count:" +
+        s" ${currentAppEnvironment.sparkProperties.size}")

Review Comment:
   this logging can be removed



##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala:
##########
@@ -528,6 +529,105 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   }
 
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " default memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off

Review Comment:
   this can be removed



##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala:
##########
@@ -528,6 +529,105 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   }
 
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " default memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off
+
+    val smallMemoryOverheadConf = baseConf.clone
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
+      .set("spark.executor.memory", "64g")
+    initDefaultProfile(smallMemoryOverheadConf)
+    val step = new BasicExecutorFeatureStep(
+      newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)),
+      new SecurityManager(smallMemoryOverheadConf), defaultProfile)
+
+    val podConfigured = step.configurePod(basePod)
+    val resource = podConfigured.container.getResources
+    assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
+    // assert(defaultProfile.executorResources("memoryOverhead").amount === 
6.4 * 1024)
+    assert(resource.getLimits.get("memory").getAmount.toLong === 
math.floor((64 + 6.4) * 1024))
+    assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024)
+  }
+
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " small memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off

Review Comment:
   this can be removed



##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala:
##########
@@ -528,6 +529,105 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   }
 
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " default memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off
+
+    val smallMemoryOverheadConf = baseConf.clone
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
+      .set("spark.executor.memory", "64g")
+    initDefaultProfile(smallMemoryOverheadConf)
+    val step = new BasicExecutorFeatureStep(
+      newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)),
+      new SecurityManager(smallMemoryOverheadConf), defaultProfile)
+
+    val podConfigured = step.configurePod(basePod)
+    val resource = podConfigured.container.getResources
+    assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
+    // assert(defaultProfile.executorResources("memoryOverhead").amount === 
6.4 * 1024)
+    assert(resource.getLimits.get("memory").getAmount.toLong === 
math.floor((64 + 6.4) * 1024))
+    assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024)
+  }
+
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " small memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off
+
+    val smallMemoryOverheadConf = baseConf.clone
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
+      .set("spark.executor.memory", "64g")
+      .set("spark.executor.memoryOverhead", "10g")
+    initDefaultProfile(smallMemoryOverheadConf)
+    val step = new BasicExecutorFeatureStep(
+      newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)),
+      new SecurityManager(smallMemoryOverheadConf), defaultProfile)
+
+    val podConfigured = step.configurePod(basePod)
+    val resource = podConfigured.container.getResources
+    assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
+    assert(defaultProfile.executorResources("memoryOverhead").amount === 10240)
+    assert(resource.getLimits.get("memory").getAmount.toLong === 74 * 1024)
+    assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024)
+  }
+
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " big memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off

Review Comment:
   this can be removed



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -559,12 +563,71 @@ object ResourceProfile extends Logging {
     } else {
       0L
     }
-    val totalMemMiB =
-      (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
-    ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
-      pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
+
+    if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) {
+      val totalMemMiB =
+        (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit = 
None,
+        finalCustomResources)
+    } else {
+      val burstyControlFactor = 
conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR)
+      val newMemoryOverheadMiB = (memoryOverheadMiB - math.min(
+        (executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0), 
memoryOverheadMiB))
+        .toLong
+      val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" +
+        s" $newMemoryOverheadMiB MB")
+      updateEventLogAndUI(newMemoryOverheadMiB, conf)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest,
+        totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources)
+    }
+  }
+
+  private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf: 
SparkConf): Unit = {
+    conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB)
+    val sparkContextOption = SparkContext.getActive
+    if (sparkContextOption.isDefined) {
+      val sparkContext = sparkContextOption.get
+      val klass = classOf[ApplicationEnvironmentInfoWrapper]
+      val currentAppEnvironment = sparkContext._statusStore.store.read(klass, 
klass.getName()).info
+      logInfo(s"currentAppEnvironment spark properties count:" +
+        s" ${currentAppEnvironment.sparkProperties.size}")
+      val newAppEnvironment = 
ApplicationEnvironmentInfo.create(currentAppEnvironment,
+        newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key ->
+          newMemoryOverheadMiB.toString))
+      logInfo(s"newAppEnvironment spark properties count:" +
+        s" ${newAppEnvironment.sparkProperties.size}")
+      sparkContext._statusStore.store.write(new 
ApplicationEnvironmentInfoWrapper(
+        newAppEnvironment))
+      // we have to post full information here, but need ensure that the 
downstream pipeline can
+      // consume duplicate entries properly

Review Comment:
   this comment can also be removed 



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -559,12 +563,71 @@ object ResourceProfile extends Logging {
     } else {
       0L
     }
-    val totalMemMiB =
-      (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
-    ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
-      pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
+
+    if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) {
+      val totalMemMiB =
+        (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit = 
None,
+        finalCustomResources)
+    } else {
+      val burstyControlFactor = 
conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR)
+      val newMemoryOverheadMiB = (memoryOverheadMiB - math.min(
+        (executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0), 
memoryOverheadMiB))
+        .toLong
+      val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" +
+        s" $newMemoryOverheadMiB MB")
+      updateEventLogAndUI(newMemoryOverheadMiB, conf)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest,
+        totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources)
+    }
+  }
+
+  private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf: 
SparkConf): Unit = {
+    conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB)
+    val sparkContextOption = SparkContext.getActive
+    if (sparkContextOption.isDefined) {
+      val sparkContext = sparkContextOption.get
+      val klass = classOf[ApplicationEnvironmentInfoWrapper]
+      val currentAppEnvironment = sparkContext._statusStore.store.read(klass, 
klass.getName()).info
+      logInfo(s"currentAppEnvironment spark properties count:" +
+        s" ${currentAppEnvironment.sparkProperties.size}")
+      val newAppEnvironment = 
ApplicationEnvironmentInfo.create(currentAppEnvironment,
+        newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key ->
+          newMemoryOverheadMiB.toString))
+      logInfo(s"newAppEnvironment spark properties count:" +
+        s" ${newAppEnvironment.sparkProperties.size}")
+      sparkContext._statusStore.store.write(new 
ApplicationEnvironmentInfoWrapper(
+        newAppEnvironment))
+      // we have to post full information here, but need ensure that the 
downstream pipeline can
+      // consume duplicate entries properly
+      this.synchronized {
+        if (!loggedBurstyMemoryOverhead) {
+          SparkContext.getActive.get.eventLogger.foreach { logger =>
+            logger.onEnvironmentUpdate(SparkListenerEnvironmentUpdate(
+              Map("Spark Properties" -> newAppEnvironment.sparkProperties,
+                "JVM Information" -> Seq(("Java Version", 
newAppEnvironment.runtime.javaVersion),
+                  ("Java Home", newAppEnvironment.runtime.javaHome),
+                  ("Scala Version", newAppEnvironment.runtime.scalaVersion)),
+                "Hadoop Properties" -> newAppEnvironment.hadoopProperties,
+                "System Properties" -> newAppEnvironment.systemProperties,
+                "Classpath Entries" -> newAppEnvironment.classpathEntries)
+            ))
+            loggedBurstyMemoryOverhead = true
+            logInfo("make a event log for bursty memory overhead")
+          }
+        }
+      }
+      logInfo(s"posted memoryoverhead update event")

Review Comment:
   this can be removed



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -559,12 +563,71 @@ object ResourceProfile extends Logging {
     } else {
       0L
     }
-    val totalMemMiB =
-      (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
-    ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
-      pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
+
+    if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) {
+      val totalMemMiB =
+        (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + 
pysparkMemToUseMiB)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit = 
None,
+        finalCustomResources)
+    } else {
+      val burstyControlFactor = 
conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR)
+      val newMemoryOverheadMiB = (memoryOverheadMiB - math.min(
+        (executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor - 1.0), 
memoryOverheadMiB))
+        .toLong
+      val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB + 
memoryOffHeapMiB +
+        pysparkMemToUseMiB
+      logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" +
+        s" $newMemoryOverheadMiB MB")
+      updateEventLogAndUI(newMemoryOverheadMiB, conf)
+      ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+        pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest,
+        totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources)
+    }
+  }
+
+  private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf: 
SparkConf): Unit = {
+    conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB)
+    val sparkContextOption = SparkContext.getActive
+    if (sparkContextOption.isDefined) {
+      val sparkContext = sparkContextOption.get
+      val klass = classOf[ApplicationEnvironmentInfoWrapper]
+      val currentAppEnvironment = sparkContext._statusStore.store.read(klass, 
klass.getName()).info
+      logInfo(s"currentAppEnvironment spark properties count:" +
+        s" ${currentAppEnvironment.sparkProperties.size}")
+      val newAppEnvironment = 
ApplicationEnvironmentInfo.create(currentAppEnvironment,
+        newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key ->
+          newMemoryOverheadMiB.toString))
+      logInfo(s"newAppEnvironment spark properties count:" +
+        s" ${newAppEnvironment.sparkProperties.size}")

Review Comment:
   this logging can be removed



##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala:
##########
@@ -528,6 +529,105 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   }
 
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " default memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off
+
+    val smallMemoryOverheadConf = baseConf.clone
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
+      .set("spark.executor.memory", "64g")
+    initDefaultProfile(smallMemoryOverheadConf)
+    val step = new BasicExecutorFeatureStep(
+      newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)),
+      new SecurityManager(smallMemoryOverheadConf), defaultProfile)
+
+    val podConfigured = step.configurePod(basePod)
+    val resource = podConfigured.container.getResources
+    assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
+    // assert(defaultProfile.executorResources("memoryOverhead").amount === 
6.4 * 1024)
+    assert(resource.getLimits.get("memory").getAmount.toLong === 
math.floor((64 + 6.4) * 1024))
+    assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024)
+  }
+
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " small memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off
+
+    val smallMemoryOverheadConf = baseConf.clone
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
+      .set("spark.executor.memory", "64g")
+      .set("spark.executor.memoryOverhead", "10g")
+    initDefaultProfile(smallMemoryOverheadConf)
+    val step = new BasicExecutorFeatureStep(
+      newExecutorConf(sparkConf = Some(smallMemoryOverheadConf)),
+      new SecurityManager(smallMemoryOverheadConf), defaultProfile)
+
+    val podConfigured = step.configurePod(basePod)
+    val resource = podConfigured.container.getResources
+    assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
+    assert(defaultProfile.executorResources("memoryOverhead").amount === 10240)
+    assert(resource.getLimits.get("memory").getAmount.toLong === 74 * 1024)
+    assert(resource.getRequests.get("memory").getAmount.toLong === 64 * 1024)
+  }
+
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " big memoryOverhead profile") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off
+
+    val bigMemoryOverheadConf = baseConf.clone
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED, true)
+      .set(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR, 1.2)
+      .set("spark.executor.memory", "64g")
+      .set("spark.executor.memoryOverhead", "20g")
+    initDefaultProfile(bigMemoryOverheadConf)
+    val step = new BasicExecutorFeatureStep(
+      newExecutorConf(sparkConf = Some(bigMemoryOverheadConf)),
+      new SecurityManager(bigMemoryOverheadConf), defaultProfile)
+
+    val podConfigured = step.configurePod(basePod)
+    val resource = podConfigured.container.getResources
+    assert(defaultProfile.executorResources("memory").amount === 64 * 1024)
+    assert(defaultProfile.executorResources("memoryOverhead").amount === 20480)
+    assert(resource.getLimits.get("memory").getAmount.toLong === 84 * 1024)
+    assert(resource.getRequests.get("memory").getAmount.toLong === 
math.floor((64 + 3.2) * 1024))
+  }
+
+  test("when turning on bursty memory overhead, configure request and limit 
correctly with" +
+    " big memoryOverhead profile and non-default factor") {
+    baseConf.remove(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    baseConf.set("spark.app.name", "xyz.abc _i_am_a_app_name_w/_some_abbrs")
+    val basePod = SparkPod.initialPod()
+    // scalastyle:off

Review Comment:
   this can be removed



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -445,6 +445,33 @@ package object config {
         "Ensure that memory overhead is a double greater than 0")
       .createWithDefault(0.1)
 
+  private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED =
+    ConfigBuilder("spark.executor.memoryOverheadBursty.enabled")
+      .doc("Whether to enable memory overhead bursty")
+      .version("3.2.0")

Review Comment:
   let's change these versions to 4.2.0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to