agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r468955452



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {

Review comment:
       should there be a check for executorsToDecommission.notEmpty ? 
Otherwise, we will request executors again with no change in the 
adjustExecutors helper method. Could again lead to some unnecessary strain on 
the driver. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      adjustExecutors(executorsToDecommission.map(_._1))
+    }
+
+    val decommissioned = executorsToDecommission.filter { case (executorId, 
decomInfo) =>
+      doDecommission(executorId, decomInfo)
+    }.map(_._1)
+    decommissioned
+  }
+
+
+  private def doDecommission(executorId: String,
+      decomInfo: ExecutorDecommissionInfo): Boolean = {
+
+    logInfo(s"Asking executor $executorId to decommissioning.")
+    try {
+      scheduler.executorDecommission(executorId, decomInfo)
+      if (driverEndpoint != null) {
+        logInfo("Propagating executor decommission to driver.")
+        driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))

Review comment:
       You are so RIGHT !!!. Sorry, for not spotting that. In which case I 
think this is not just fine but also neat in that the code paths for 
decommissioned are pretty nicely converged now. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      adjustExecutors(executorsToDecommission.map(_._1))
+    }
+
+    val decommissioned = executorsToDecommission.filter { case (executorId, 
decomInfo) =>
+      doDecommission(executorId, decomInfo)
+    }.map(_._1)
+    decommissioned

Review comment:
       nit: This looks like it was put for debugging. Can we omit the temporary 
val decommissioned ?

##########
File path: 
core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
##########
@@ -242,8 +242,10 @@ class DecommissionWorkerSuite
       assert(jobResult === 2)
     }
     // 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage 
attempt 1 and 2.
-    val tasksSeen = listener.getTasksFinished()

Review comment:
       Lets undo this change then. I am rerunning this PR locally to debug. 
Thanks for sharing the GHA link. It helps.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to