agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r468139402
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
Review comment:
Thanks :-P
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- execs.foreach(execId => sched.decommissionExecutor(execId,
ExecutorDecommissionInfo("", false)))
+ // Make the executors decommission, finish, exit, and not be replaced.
+ val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false)))
+ sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors =
true)
Review comment:
^^^ @holdenk ... any thoughts/followup on this ?
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we
already did that
// when the task backlog decreased.
- client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
- countFailures = false, force = false)
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
Review comment:
I think a private val would make it easier to read and also consistent:
We know upfront whether we are doing decommissioning or not without the chance
of some codepaths doing decommissioning and then some later not (when the
config changes).
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we
already did that
// when the task backlog decreased.
- client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
- countFailures = false, force = false)
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+ val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
+ id => (id, ExecutorDecommissionInfo("spark scale down", false)))
+ client.decommissionExecutors(executorIdsWithoutHostLoss,
adjustTargetNumExecutors = false)
+ } else {
+ client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
Review comment:
Okay. It is a little bit unsettling to mer personally, that we don't
have a timeout on the time that an executor would take to "migrate and finally
die", but I guess we can live with that shortcoming for a while. It just
reduces the effective cluster capacity for an indeterminate amount of time,
which is what makes me slightly queasy about it.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
logInfo(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(
execToDecommission,
- ExecutorDecommissionInfo("", isHostDecommissioned = false))
+ ExecutorDecommissionInfo("", isHostDecommissioned = false),
+ adjustTargetNumExecutors = true)
Review comment:
^^^ @holdenk ... any thoughts/followup on this ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
+ executorsToDecommission.foreach { case (exec, _) =>
+ val rpId = withLock {
+ executorDataMap(exec).resourceProfileId
+ }
+ val rp =
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+ if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+ // Assume that we are killing an executor that was started by
default and
+ // not through the request api
+ requestedTotalExecutorsPerResourceProfile(rp) = 0
+ } else {
+ val requestedTotalForRp =
requestedTotalExecutorsPerResourceProfile(rp)
+ requestedTotalExecutorsPerResourceProfile(rp) =
math.max(requestedTotalForRp - 1, 0)
+ }
+ }
+ doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+ }
+
+ val decommissioned = executorsToDecommission.filter { case (executorId,
decomInfo) =>
+ doDecommission(executorId, decomInfo)
+ }.map(_._1)
+ decommissioned
+ }
+
+
+ private def doDecommission(executorId: String,
Review comment:
^^^ @holdenk ... any thoughts/followup on this ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
var newNextTimeout = Long.MaxValue
timedOutExecs = executors.asScala
- .filter { case (_, exec) => !exec.pendingRemoval &&
!exec.hasActiveShuffle }
+ .filter { case (_, exec) =>
+ !exec.pendingRemoval && !exec.hasActiveShuffle &&
!exec.decommissioning}
Review comment:
^^^ @holdenk ... any thoughts/followup on this ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]