agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r467509103



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      executorsToDecommission.foreach { case (exec, _) =>
+        val rpId = withLock {
+          executorDataMap(exec).resourceProfileId
+        }

Review comment:
       There is a bug here (which is absent in `killExecutors` from where this 
is copy pasted): the `withLock` is terminated early and thus access to 
`requestedTotalExecutorsPerResourceProfile` is unprotected.
   
   Refactoring would help not have this bug.
   
   [Blocker]

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
     val removed = executors.remove(event.executorId)
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
-      if (!removed.pendingRemoval) {
+      if (!removed.pendingRemoval || !removed.pendingDecommissioning) {
         nextTimeout.set(Long.MinValue)
       }
     }
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
-    if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
-      return
-    }
     val exec = 
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
       UNKNOWN_RESOURCE_PROFILE_ID)
+

Review comment:
       Got it.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     logInfo(s"Decommissioning executor ${execToDecommission}")
     sched.decommissionExecutor(
       execToDecommission,
-      ExecutorDecommissionInfo("", isHostDecommissioned = false))
+      ExecutorDecommissionInfo("", isHostDecommissioned = false),
+      adjustTargetNumExecutors = true)

Review comment:
       Copy pasting the response from the similar comment in 
`WorkerDecommissionSuite.scala`:
   
   > I did reread the code and now I am sure I understand it: I think 
adjustTargetNumExecutors flag should continue to remain false here, so that it 
matches the old semantics:
   >
   > Before this PR: decommissioning wouldn't change the requested executors. 
They remain whatever the test application requested previously. This should 
continue to hold: This test does not use dynamic allocation and I don't see a 
need to change this test behavior.
   

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
 
       var newNextTimeout = Long.MaxValue
       timedOutExecs = executors.asScala
-        .filter { case (_, exec) => !exec.pendingRemoval && 
!exec.hasActiveShuffle }
+        .filter { case (_, exec) =>
+          !exec.pendingRemoval && !exec.hasActiveShuffle && 
!exec.decommissioning}

Review comment:
       I went through all of the usages of executor.pendingRemoval and 
executor.decommissioning flag: They are treated identically right now. That is 
for all practical purposes an executor being decommissioned is treated the same 
as an executor pending to be removed. 
   
   Do you have a use case in mind of why you would like to distinguish b/w 
these two states ? If you don't need to distinguish, the change would become 
simpler if you treat a decommissioned executor as pending removal. 
   
   I cannot see where this distinction is relevant in this PR, so perhaps you 
have a future use case in mind for this distinction ?

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
     } else {
       // We don't want to change our target number of executors, because we 
already did that
       // when the task backlog decreased.
-      client.killExecutors(executorIdsToBeRemoved.toSeq, 
adjustTargetNumExecutors = false,
-        countFailures = false, force = false)
+      if (conf.get(WORKER_DECOMMISSION_ENABLED)) {

Review comment:
       I see that we are not saving the config value at initialization time and 
are instead checking the conf on each round -- is that so that you can disable 
this setting mid flight and revert to killing (instead of decommissioning) 
without restarting the SparkContext ? 

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
     } else {
       // We don't want to change our target number of executors, because we 
already did that
       // when the task backlog decreased.
-      client.killExecutors(executorIdsToBeRemoved.toSeq, 
adjustTargetNumExecutors = false,
-        countFailures = false, force = false)
+      if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+        val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
+          id => (id, ExecutorDecommissionInfo("spark scale down", false)))
+        client.decommissionExecutors(executorIdsWithoutHostLoss, 
adjustTargetNumExecutors = false)
+      } else {
+        client.killExecutors(executorIdsToBeRemoved.toSeq, 
adjustTargetNumExecutors = false,

Review comment:
       Actually there is a key big difference between `killExecutors` and 
`decommissionExecutors`: The former does the actual killing and the latter does 
'graceful death': Graceful death waits for all the migrations etc to happen and 
then the executor exits. I think this may lead to an overcommit: The old 
executor is still being decommissioned and migrating things, and a new one 
could be spun up. The two executors are alive at the same time (perhaps on the 
same node). Would this cause issues ?
   
   This problem does not exist for `killExecutors` so much: Kill executor will 
kill the executor soon enough and thus the overlap window is smaller. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      executorsToDecommission.foreach { case (exec, _) =>
+        val rpId = withLock {
+          executorDataMap(exec).resourceProfileId
+        }
+        val rp = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+        if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+          // Assume that we are killing an executor that was started by 
default and
+          // not through the request api
+          requestedTotalExecutorsPerResourceProfile(rp) = 0
+        } else {
+          val requestedTotalForRp = 
requestedTotalExecutorsPerResourceProfile(rp)
+          requestedTotalExecutorsPerResourceProfile(rp) = 
math.max(requestedTotalForRp - 1, 0)
+        }
+      }
+      doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+    }
+
+    val decommissioned = executorsToDecommission.filter { case (executorId, 
decomInfo) =>
+      doDecommission(executorId, decomInfo)
+    }.map(_._1)
+    decommissioned
+  }
+
+
+  private def doDecommission(executorId: String,

Review comment:
       Why is this duplicating the `decommissionExecutor` call above ? I think 
you are decommissioning twice now: This method is called synchronously by the 
ExecutorAllocationManager: It sends a message `DecommissionExecutor` to itself, 
which then calls `decommissionExecutor` on line 425 in this file. This does not 
smell right to me: both the code duplication and also the double 
decommissioning.
   
   I think you want to simply send the message here and let the DriverEndpoint 
work do the actual decommissioning as before. Ie. only do this: 
`driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))`
   
   [Blocker]

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager(
         s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 
0!")
     }
     if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
-      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+      // If dynamic allocation shuffle tracking or worker decommissioning 
along with
+      // storage shuffle decommissioning is enabled we have *experimental* 
support for
+      // decommissioning without a shuffle service.
+      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+          (conf.get(WORKER_DECOMMISSION_ENABLED) &&
+            conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
         logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")

Review comment:
       Okay. I understand the intention now. Nevermind :-) 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
     val removed = executors.remove(event.executorId)
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
-      if (!removed.pendingRemoval) {
+      if (!removed.pendingRemoval || !removed.decommissioning) {

Review comment:
       Thanks for clarifying that !

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {

Review comment:
       Okay, I understand this better now. `adjustTargetNumExecutors=true` 
means that the scheduler will not try to replenish the executor. Otherwise, the 
scheduler still thinks that the application wants the same number of executors 
as before.
   
   I would strongly recommend that you please consider extracting out this 
piece of common code (shared with `killExecutors`) into a separate helper 
method. Its fairly subtle and having it one place will both help the 
readability of this PR and future changes.

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
     // decom.sh message passing is tested manually.
     val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
     val execs = sched.getExecutorIds()
-    execs.foreach(execId => sched.decommissionExecutor(execId, 
ExecutorDecommissionInfo("", false)))
+    // Make the executors decommission, finish, exit, and not be replaced.
+    val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false)))
+    sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = 
true)

Review comment:
       I did reread the code and now I am sure I understand it: I think 
`adjustTargetNumExecutors` flag should continue to remain false here, so that 
it matches the old semantics. This code above does not need to match 
"killExecutors" implementation because it is testing decommissioning and not 
killing. It didn't even do killing before.
   
   Before this PR: decommissioning wouldn't change the requested executors. 
They remain whatever the test application requested previously. This should 
continue to hold: This test does not use dynamic allocation and I don't see a 
need to change this test behavior. 
   
   Typically you don't want to change test behavior unless you are changing the 
product code associated with it too. Tests typically exist for catching 
regression and their expectations shouldn't just change in the middle for no 
reason. 
   
   I would consider this a soft blocker. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to