holdenk commented on a change in pull request #32766:
URL: https://github.com/apache/spark/pull/32766#discussion_r707614453
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batches to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(3000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
+ ConfigBuilder("spark.executor.decommission.batchSize")
+ .doc("Executors are decommissioned in batches to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the size of a
batch.")
+ .version("3.2.0")
Review comment:
Let's change this to 3.3.0 since this is now too late for 3.2 (sorry for
not reviewing this sooner).
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
Review comment:
I like this idea, setting a maximum number of concurrent nodes for Spark
commanded decommissioning . That being said, I think if the cluster manager /
cloud provider triggers a SIGPWR or otherwise marks a node for decommissioning
I think we shouldn't limit the number of decommissioning executors since the
CM/cloud provider likely has their own time limits so we need to do what we can.
I know this probably complicates the code, if we want we could do this as a
multi-part PR.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -478,6 +478,45 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
+ private val executorsToDecommissionInBatches = new HashSet[String]
+
+ private val executorDecommissionEnabled =
conf.get(config.DECOMMISSION_ENABLED)
+ private var periodicBatchExecutorsDecommissionThread:
Option[ScheduledExecutorService] = None
+ private var periodicBatchExecutorsDecommissionTask:
Option[ScheduledFuture[_]] = None
+
+ if (executorDecommissionEnabled) {
+ /**
+ * Start a thread which runs a task periodically to decommission executors.
Review comment:
I think it could make sense for us to distinguish between executors
where Spark has chosen decommissioning and we are not under a time pressure and
those where the cloud provider / cluster manager has trigger decommissioning
and we are under a time pressure. In the second case I don't think we should
apply any rate limiting.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -519,10 +558,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
if (!triggeredByExecutor) {
- executorsToDecommission.foreach { executorId =>
- logInfo(s"Notify executor $executorId to decommissioning.")
- executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor)
- }
Review comment:
I would also love more information if your able to share @q2w about the
issues you ran into in production.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2023,6 +2023,22 @@ package object config {
.stringConf
.createWithDefaultString("PWR")
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.batchInterval")
+ .doc("Executors are decommissioned in batches to avoid overloading
network bandwidth in" +
+ " migrating rdd and shuffle data. This config sets the interval
between batches.")
+ .version("3.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(3000)
+
+ private[spark] val EXECUTOR_DECOMMISSION_BATCH_SIZE =
Review comment:
I think its hard to do well here.
In a perfect case scenario we could introspect the block manager and shuffle
manager since they know where data is and configure a maximum number of blocks,
but I'm of the belief that perfect is the enemy of done and I _think_ this
could be an OK approximation, especially as we add better logic to pick which
executors to decommission.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]