dongjoon-hyun commented on a change in pull request #32766:
URL: https://github.com/apache/spark/pull/32766#discussion_r645259464
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(30000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+ ConfigBuilder("spark.executor.decommission.batchSize")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
Review comment:
`the interval between batches` looks like a typo because this is a batch
size, not an interval.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -83,4 +85,29 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
}
+
+ test("Verify executors are decommissioned in batches") {
Review comment:
Could you add `SPARK-35627: ` as the test name prefix?
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(30000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+ ConfigBuilder("spark.executor.decommission.batchSize")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .intConf
+ .createWithDefault(3)
Review comment:
So, only 3 executors can be decommissioned? To avoid a regression, I'd
like to recommend to set `Int.MaxValue`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]