Ngone51 commented on a change in pull request #29817:
URL: https://github.com/apache/spark/pull/29817#discussion_r502912206



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -465,72 +464,50 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    * @param executorsAndDecomInfo Identifiers of executors & decommission info.
    * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
    *                                 after these executors have been 
decommissioned.
+   * @param triggeredByExecutor whether the decommission is triggered at 
executor.
    * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
+      adjustTargetNumExecutors: Boolean,
+      triggeredByExecutor: Boolean): Seq[String] = withLock {
     // Do not change this code without running the K8s integration suites
-    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
-        // Only bother decommissioning executors which are alive.
-        if (isExecutorActive(executorId)) {
-          executorsPendingDecommission(executorId) = decomInfo.workerHost
-          true
-        } else {
-          false
-        }
+    val executorsToDecommission = executorsAndDecomInfo.flatMap { case 
(executorId, decomInfo) =>
+      // Only bother decommissioning executors which are alive.
+      if (isExecutorActive(executorId)) {
+        scheduler.executorDecommission(executorId, decomInfo)
+        executorsPendingDecommission(executorId) = decomInfo.workerHost
+        Some(executorId)
+      } else {
+        None
       }
     }
 
     // If we don't want to replace the executors we are decommissioning
     if (adjustTargetNumExecutors) {
-      adjustExecutors(executorsToDecommission.map(_._1))
+      adjustExecutors(executorsToDecommission)
     }
 
-    executorsToDecommission.filter { case (executorId, decomInfo) =>
-      doDecommission(executorId, decomInfo)
-    }.map(_._1)
-  }
-
-  // Do not change this code without running the K8s integration suites
-  private def doDecommission(executorId: String,
-      decomInfo: ExecutorDecommissionInfo): Boolean = {
-
-    logInfo(s"Asking executor $executorId to decommissioning.")
-    scheduler.executorDecommission(executorId, decomInfo)
-    // Send decommission message to the executor (it could have originated on 
the executor
-    // but not necessarily).
-    CoarseGrainedSchedulerBackend.this.synchronized {
-      executorDataMap.get(executorId) match {
-        case Some(executorInfo) =>
-          executorInfo.executorEndpoint.send(DecommissionSelf)
-        case None =>
-          // Ignoring the executor since it is not registered.
-          logWarning(s"Attempted to decommission unknown executor 
$executorId.")
-          return false
-      }
+    // Mark those corresponding BlockManagers as decommissioned first before 
we sending
+    // decommission notification to executors. So, it's less likely to lead to 
the race
+    // condition where `getPeer` request from the decommissioned executor 
comes first
+    // before the BlockManagers are marked as decommissioned.
+    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+      logInfo(s"Asking BlockManagers on executors 
(${executorsToDecommission.mkString(", ")}) " +
+        s"to decommissioning.")
+      
scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
     }
-    logInfo(s"Asked executor $executorId to decommission.")
 
-    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-      try {
-        logInfo(s"Asking block manager corresponding to executor $executorId 
to decommission.")
-        
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
-      } catch {
-        case e: Exception =>
-          logError("Unexpected error during block manager " +
-            s"decommissioning for executor $executorId: ${e.toString}", e)
-          return false
+    if (!triggeredByExecutor) {

Review comment:
       We don't really migrate blocks when decommissioning BlockManagers above. 
We only mark them as being decommissioning at driver side. So I think the 
problem you mentioned won't exist.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to