q2w commented on a change in pull request #32766:
URL: https://github.com/apache/spark/pull/32766#discussion_r645280979
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(30000)
Review comment:
Thanks for reviewing this. I have set this interval to much smaller
value as 3s.
I am not sure if we can set this to a value which works in all scenarios. It
should depend on the nature of the application and ultimately the user will
have freedom to set this.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(30000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+ ConfigBuilder("spark.executor.decommission.batchSize")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
Review comment:
Thanks for pointing it out. I have fix it.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(30000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+ ConfigBuilder("spark.executor.decommission.batchSize")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
Review comment:
Thanks for pointing it out. I have fixed it.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(30000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+ ConfigBuilder("spark.executor.decommission.batchSize")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .intConf
+ .createWithDefault(3)
Review comment:
This makes sense. Let's keep it as Int.MaxValue.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(30000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+ ConfigBuilder("spark.executor.decommission.batchSize")
+ .doc("Executors are decommissioned in batched to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .intConf
+ .createWithDefault(3)
Review comment:
This makes sense. Let's keep it as Int.MaxValue. I have updated it with
the same.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -83,4 +85,29 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
}
+
+ test("Verify executors are decommissioned in batches") {
Review comment:
Done. Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]