Ngone51 commented on a change in pull request #29722:
URL: https://github.com/apache/spark/pull/29722#discussion_r487664347



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -467,67 +469,44 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    */
   override def decommissionExecutors(
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean): Seq[String] = {
-
-    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, decomInfo) =>
-      CoarseGrainedSchedulerBackend.this.synchronized {
+      adjustTargetNumExecutors: Boolean,
+      decommissionFromDriver: Boolean): Seq[String] = {
+    val executorsToDecommission = withLock {
+      executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
         // Only bother decommissioning executors which are alive.
         if (isExecutorActive(executorId)) {
+          scheduler.executorDecommission(executorId, decomInfo)
           executorsPendingDecommission(executorId) = decomInfo.workerHost
-          true
+          Some(executorId)
         } else {
-          false
+          None
         }
       }
     }
 
     // If we don't want to replace the executors we are decommissioning
     if (adjustTargetNumExecutors) {
-      adjustExecutors(executorsToDecommission.map(_._1))
+      adjustExecutors(executorsToDecommission)
     }
 
-    executorsToDecommission.filter { case (executorId, decomInfo) =>
-      doDecommission(executorId, decomInfo)
-    }.map(_._1)
-  }
-
+    // Mark those corresponding BlockManagers as decommissioned first before 
we sending
+    // decommission notification to executors. So, it's less likely to lead to 
the race
+    // condition where `getPeer` request from the decommissioned executor 
comes first
+    // before the BlockManagers are marked as decommissioned.
+    
scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
 
-  private def doDecommission(executorId: String,
-      decomInfo: ExecutorDecommissionInfo): Boolean = {
-
-    logInfo(s"Asking executor $executorId to decommissioning.")
-    scheduler.executorDecommission(executorId, decomInfo)
-    // Send decommission message to the executor (it could have originated on 
the executor
-    // but not necessarily).
-    CoarseGrainedSchedulerBackend.this.synchronized {
-      executorDataMap.get(executorId) match {
-        case Some(executorInfo) =>
-          executorInfo.executorEndpoint.send(DecommissionSelf)
-        case None =>
-          // Ignoring the executor since it is not registered.
-          logWarning(s"Attempted to decommission unknown executor 
$executorId.")
-          return false
-      }
-    }
-    logInfo(s"Asked executor $executorId to decommission.")
-
-    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-      try {
-        logInfo(s"Asking block manager corresponding to executor $executorId 
to decommission.")
-        
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))

Review comment:
       I think it won't cause problems. Actually, I think the only difference 
is: for a certain BlockManager, previously, it is always marked as 
`decommissioned` first and then start block migration, but now block migration 
could start first in case of the decommission is triggered at executor. But 
it's ok since the decommissioned executor would only migrate blocks to other 
executors rather than itself.
   
   Note that for the case where decommission notification sent from driver to 
executor, `decommissioned` status marking still starts first before the block 
migration, so it doesn't make any differences.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to