Ngone51 commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r466361486



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +505,100 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executors Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executors: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executors.filter{case (executorId, _) =>

Review comment:
       nit: need spaces around `{`

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +505,100 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executors Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executors: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executors.filter{case (executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      executorsToDecommission.foreach { case (exec, _) =>
+        val rpId = executorDataMap(exec).resourceProfileId
+        val rp = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+        if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+          // Assume that we are killing an executor that was started by 
default and
+          // not through the request api
+          requestedTotalExecutorsPerResourceProfile(rp) = 0
+        } else {
+          val requestedTotalForRp = 
requestedTotalExecutorsPerResourceProfile(rp)
+          requestedTotalExecutorsPerResourceProfile(rp) = 
math.max(requestedTotalForRp - 1, 0)
+        }
+      }
+      doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+    }
+
+    val decommissioned = executorsToDecommission.filter{case (executorId, 
decomInfo) =>
+      doDecommission(executorId, decomInfo)
+    }.map(_._1)
+    decommissioned
+  }
+
+
+  private def doDecommission(executorId: String,
+      decomInfo: ExecutorDecommissionInfo): Boolean = {
+
+    logInfo(s"Starting decommissioning executor $executorId.")

Review comment:
       How about "Ask to decommissioning executor $executorId"?  "Start" and 
"Finish" just sounds like decommission finishes in seconds.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager(
         s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 
0!")
     }
     if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
-      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+      // If dynamic allocation shuffle tracking or worker decommissioning 
along with
+      // storage shuffle decommissioning is enabled we have *experimental* 
support for
+      // decommissioning without a shuffle service.
+      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+          (conf.get(WORKER_DECOMMISSION_ENABLED) &&

Review comment:
       Is the worker here specific to the Standalone worker?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +505,100 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executors Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executors: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executors.filter{case (executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      executorsToDecommission.foreach { case (exec, _) =>
+        val rpId = executorDataMap(exec).resourceProfileId

Review comment:
       `executorDataMap` here needs the `synchronized` protection. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +505,100 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executors Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executors: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executors.filter{case (executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      executorsToDecommission.foreach { case (exec, _) =>
+        val rpId = executorDataMap(exec).resourceProfileId
+        val rp = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+        if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+          // Assume that we are killing an executor that was started by 
default and
+          // not through the request api
+          requestedTotalExecutorsPerResourceProfile(rp) = 0
+        } else {
+          val requestedTotalForRp = 
requestedTotalExecutorsPerResourceProfile(rp)
+          requestedTotalExecutorsPerResourceProfile(rp) = 
math.max(requestedTotalForRp - 1, 0)
+        }
+      }
+      doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+    }
+
+    val decommissioned = executorsToDecommission.filter{case (executorId, 
decomInfo) =>

Review comment:
       nit: need spaces around `{`
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to