mridulm commented on code in PR #53190:
URL: https://github.com/apache/spark/pull/53190#discussion_r2723933667
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -445,6 +445,34 @@ package object config {
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)
+ private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED =
+ ConfigBuilder("spark.executor.memoryOverheadBursty.enabled")
+ .doc("Whether to enable memory overhead bursty")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR =
+ ConfigBuilder("spark.executor.memoryOverheadBurstyFactor")
+ .doc("the bursty control factor controlling the size of memory overhead
space shared with" +
+ s" other processes, newMemoryOverhead=oldMemoryOverhead-MIN((onheap +
memoryoverhead) *" +
+ s" (this value - 1), oldMemoryOverhead)")
+ .version("4.2.0")
+ .doubleConf
+ .checkValue((v: Double) => v >= 1.0,
+ "the value of bursty control factor has to be no less than 1")
+ .createWithDefault(1.2)
+
+ private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD = ConfigBuilder(
+ "spark.executor.burstyMemoryOverhead")
+ .doc(s"The adjusted amount of memoryOverhead to be allocated per executor"
+
+ s" (the adjustment happens if
${EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED.key} is enabled," +
+ " in MiB unless otherwise specified. This parameter is here only for UI
demonstration," +
Review Comment:
I agree with @dongjoon-hyun - we can drop this config.
It is not something we expect users to specify (as it is overwritten when
this feature is enabled), but just for showcasing what the computed value is.
##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -559,12 +563,63 @@ object ResourceProfile extends Logging {
} else {
0L
}
- val totalMemMiB =
- (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB +
pysparkMemToUseMiB)
- ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
- pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
+
+ if (!conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED)) {
+ val totalMemMiB =
+ (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB +
pysparkMemToUseMiB)
+ ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+ pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, totalMemMiBLimit =
None,
+ finalCustomResources)
+ } else {
+ val burstyControlFactor =
conf.get(EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR)
+ val newMemoryOverheadMiB = (memoryOverheadMiB - math.min(
+ ((executorMemoryMiB + memoryOverheadMiB) * (burstyControlFactor -
1.0)).toLong,
+ memoryOverheadMiB))
+ val totalMemMiBLimit = executorMemoryMiB + memoryOverheadMiB +
memoryOffHeapMiB +
+ pysparkMemToUseMiB
+ val totalMemMiBRequest = executorMemoryMiB + newMemoryOverheadMiB +
memoryOffHeapMiB +
+ pysparkMemToUseMiB
+ logInfo(s"reduce memoryoverhead request from $memoryOverheadMiB MB to" +
+ s" $newMemoryOverheadMiB MB")
+ updateEventLogAndUI(newMemoryOverheadMiB, conf)
+ ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+ pysparkMemToUseMiB, newMemoryOverheadMiB, totalMemMiBRequest,
+ totalMemMiBLimit = Some(totalMemMiBLimit), finalCustomResources)
+ }
+ }
+
+ private def updateEventLogAndUI(newMemoryOverheadMiB: Long, conf:
SparkConf): Unit = {
+ conf.set(EXECUTOR_BURSTY_MEMORY_OVERHEAD, newMemoryOverheadMiB)
+ val sparkContextOption = SparkContext.getActive
+ if (sparkContextOption.isDefined) {
+ val sparkContext = sparkContextOption.get
+ val klass = classOf[ApplicationEnvironmentInfoWrapper]
+ val currentAppEnvironment = sparkContext.statusStore.store.read(klass,
klass.getName()).info
+ val newAppEnvironment =
ApplicationEnvironmentInfo.create(currentAppEnvironment,
+ newSparkProperties = Map(EXECUTOR_BURSTY_MEMORY_OVERHEAD.key ->
+ newMemoryOverheadMiB.toString))
+ sparkContext.statusStore.store.write(new
ApplicationEnvironmentInfoWrapper(
+ newAppEnvironment))
+ this.synchronized {
+ if (!loggedBurstyMemoryOverhead) {
+ SparkContext.getActive.get.eventLogger.foreach { logger =>
+ logger.onEnvironmentUpdate(SparkListenerEnvironmentUpdate(
+ Map("Spark Properties" -> newAppEnvironment.sparkProperties,
+ "JVM Information" -> Seq(("Java Version",
newAppEnvironment.runtime.javaVersion),
+ ("Java Home", newAppEnvironment.runtime.javaHome),
+ ("Scala Version", newAppEnvironment.runtime.scalaVersion)),
+ "Hadoop Properties" -> newAppEnvironment.hadoopProperties,
+ "System Properties" -> newAppEnvironment.systemProperties,
+ "Classpath Entries" -> newAppEnvironment.classpathEntries)
+ ))
+ loggedBurstyMemoryOverhead = true
+ }
+ }
Review Comment:
Fire an event (`sc.postEnvironmentUpdate`) - and ensure the listener updates
state: instead of trying to handle it here.
This will miss history server, and other consumers of spark events
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -445,6 +445,34 @@ package object config {
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)
+ private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED =
+ ConfigBuilder("spark.executor.memoryOverheadBursty.enabled")
+ .doc("Whether to enable memory overhead bursty")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD_FACTOR =
+ ConfigBuilder("spark.executor.memoryOverheadBurstyFactor")
+ .doc("the bursty control factor controlling the size of memory overhead
space shared with" +
+ s" other processes, newMemoryOverhead=oldMemoryOverhead-MIN((onheap +
memoryoverhead) *" +
+ s" (this value - 1), oldMemoryOverhead)")
+ .version("4.2.0")
+ .doubleConf
+ .checkValue((v: Double) => v >= 1.0,
+ "the value of bursty control factor has to be no less than 1")
+ .createWithDefault(1.2)
+
+ private[spark] val EXECUTOR_BURSTY_MEMORY_OVERHEAD = ConfigBuilder(
+ "spark.executor.burstyMemoryOverhead")
+ .doc(s"The adjusted amount of memoryOverhead to be allocated per executor"
+
+ s" (the adjustment happens if
${EXECUTOR_BURSTY_MEMORY_OVERHEAD_ENABLED.key} is enabled," +
+ " in MiB unless otherwise specified. This parameter is here only for UI
demonstration," +
+ " there is not effect when user sets it directly")
+ .version("4.2.0")
+ .bytesConf(ByteUnit.MiB)
+ .createOptional
+
Review Comment:
Sounds good, we can keep it k8s only for now.
##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -48,7 +51,7 @@ import org.apache.spark.util.Utils
@Evolving
@Since("3.1.0")
class ResourceProfile(
- val executorResources: Map[String, ExecutorResourceRequest],
+ var executorResources: Map[String, ExecutorResourceRequest],
Review Comment:
Please revert this, it changes expectation w.r.t `ResourceProfile`, which is
immutable - especially given it is public api.
Inferring overhead, when not specified by users, is an existing behavior -
if the intention is to explicitly show it in UI when not configured , this can
be done in UI code explicitly - instead of mutating RP.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]