Ngone51 commented on a change in pull request #29722:
URL: https://github.com/apache/spark/pull/29722#discussion_r487668029
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -467,67 +469,44 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
*/
override def decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
- adjustTargetNumExecutors: Boolean): Seq[String] = {
-
- val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, decomInfo) =>
- CoarseGrainedSchedulerBackend.this.synchronized {
+ adjustTargetNumExecutors: Boolean,
+ decommissionFromDriver: Boolean): Seq[String] = {
+ val executorsToDecommission = withLock {
+ executorsAndDecomInfo.flatMap { case (executorId, decomInfo) =>
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
+ scheduler.executorDecommission(executorId, decomInfo)
executorsPendingDecommission(executorId) = decomInfo.workerHost
- true
+ Some(executorId)
} else {
- false
+ None
}
}
}
// If we don't want to replace the executors we are decommissioning
if (adjustTargetNumExecutors) {
- adjustExecutors(executorsToDecommission.map(_._1))
+ adjustExecutors(executorsToDecommission)
}
- executorsToDecommission.filter { case (executorId, decomInfo) =>
- doDecommission(executorId, decomInfo)
- }.map(_._1)
- }
-
+ // Mark those corresponding BlockManagers as decommissioned first before
we sending
+ // decommission notification to executors. So, it's less likely to lead to
the race
+ // condition where `getPeer` request from the decommissioned executor
comes first
+ // before the BlockManagers are marked as decommissioned.
+
scheduler.sc.env.blockManager.master.decommissionBlockManagers(executorsToDecommission)
Review comment:
> Should this check be guarded by conf.get(STORAGE_DECOMMISSION_ENABLED)
?
Oops, I missed it.
> Also should this only be done in case of decommissionFromDriver == true ?
No. BlockManagerMaster is the only place that has the global info of
BlockManagers in the cluster. So wherever the decommission is triggered, we
need to update the status(e.g., mark as decommissioned in this case) of the
corresponding BlockManagers.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]