q2w commented on a change in pull request #32766:
URL: https://github.com/apache/spark/pull/32766#discussion_r645280979



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
       .stringConf
       .createWithDefaultString("PWR")
 
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.batchInterval")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")
+      .version("3.2.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(30000)

Review comment:
       Thanks for reviewing this. I have set this interval to much smaller 
value as 3s. 
   
   I am not sure if we can set this to a value which works in all scenarios. It 
should depend on the nature of the application and ultimately the user will 
have freedom to set this.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
       .stringConf
       .createWithDefaultString("PWR")
 
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.batchInterval")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")
+      .version("3.2.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(30000)
+
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+    ConfigBuilder("spark.executor.decommission.batchSize")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")

Review comment:
       Thanks for pointing it out. I have fix it.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
       .stringConf
       .createWithDefaultString("PWR")
 
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.batchInterval")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")
+      .version("3.2.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(30000)
+
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+    ConfigBuilder("spark.executor.decommission.batchSize")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")

Review comment:
       Thanks for pointing it out. I have fixed it.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
       .stringConf
       .createWithDefaultString("PWR")
 
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.batchInterval")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")
+      .version("3.2.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(30000)
+
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+    ConfigBuilder("spark.executor.decommission.batchSize")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")
+      .version("3.2.0")
+      .intConf
+      .createWithDefault(3)

Review comment:
       This makes sense. Let's keep it as Int.MaxValue.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
       .stringConf
       .createWithDefaultString("PWR")
 
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.batchInterval")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")
+      .version("3.2.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(30000)
+
+  private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+    ConfigBuilder("spark.executor.decommission.batchSize")
+      .doc("Executors are decommissioned in batched to avoid overloading 
network bandwidth in" +
+        " migrating rdd and shuffle data. This config sets the interval 
between batches.")
+      .version("3.2.0")
+      .intConf
+      .createWithDefault(3)

Review comment:
       This makes sense. Let's keep it as Int.MaxValue. I have updated it with 
the same.

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -83,4 +85,29 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
     val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
     assert(asyncCountResult === 10)
   }
+
+  test("Verify executors are decommissioned in batches") {

Review comment:
       Done. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to