agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r467509103
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
+ executorsToDecommission.foreach { case (exec, _) =>
+ val rpId = withLock {
+ executorDataMap(exec).resourceProfileId
+ }
Review comment:
There is a bug here (which is absent in `killExecutors` from where this
is copy pasted): the `withLock` is terminated early and thus access to
`requestedTotalExecutorsPerResourceProfile` is unprotected.
Refactoring would help not have this bug.
[Blocker]
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
val removed = executors.remove(event.executorId)
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
- if (!removed.pendingRemoval) {
+ if (!removed.pendingRemoval || !removed.pendingDecommissioning) {
nextTimeout.set(Long.MinValue)
}
}
}
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
- if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
- return
- }
val exec =
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
UNKNOWN_RESOURCE_PROFILE_ID)
+
Review comment:
Got it.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
logInfo(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(
execToDecommission,
- ExecutorDecommissionInfo("", isHostDecommissioned = false))
+ ExecutorDecommissionInfo("", isHostDecommissioned = false),
+ adjustTargetNumExecutors = true)
Review comment:
Copy pasting the response from the similar comment in
`WorkerDecommissionSuite.scala`:
> I did reread the code and now I am sure I understand it: I think
adjustTargetNumExecutors flag should continue to remain false here, so that it
matches the old semantics:
>
> Before this PR: decommissioning wouldn't change the requested executors.
They remain whatever the test application requested previously. This should
continue to hold: This test does not use dynamic allocation and I don't see a
need to change this test behavior.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
var newNextTimeout = Long.MaxValue
timedOutExecs = executors.asScala
- .filter { case (_, exec) => !exec.pendingRemoval &&
!exec.hasActiveShuffle }
+ .filter { case (_, exec) =>
+ !exec.pendingRemoval && !exec.hasActiveShuffle &&
!exec.decommissioning}
Review comment:
I went through all of the usages of executor.pendingRemoval and
executor.decommissioning flag: They are treated identically right now. That is
for all practical purposes an executor being decommissioned is treated the same
as an executor pending to be removed.
Do you have a use case in mind of why you would like to distinguish b/w
these two states ? If you don't need to distinguish, the change would become
simpler if you treat a decommissioned executor as pending removal.
I cannot see where this distinction is relevant in this PR, so perhaps you
have a future use case in mind for this distinction ?
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we
already did that
// when the task backlog decreased.
- client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
- countFailures = false, force = false)
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
Review comment:
I see that we are not saving the config value at initialization time and
are instead checking the conf on each round -- is that so that you can disable
this setting mid flight and revert to killing (instead of decommissioning)
without restarting the SparkContext ?
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we
already did that
// when the task backlog decreased.
- client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
- countFailures = false, force = false)
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+ val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
+ id => (id, ExecutorDecommissionInfo("spark scale down", false)))
+ client.decommissionExecutors(executorIdsWithoutHostLoss,
adjustTargetNumExecutors = false)
+ } else {
+ client.killExecutors(executorIdsToBeRemoved.toSeq,
adjustTargetNumExecutors = false,
Review comment:
Actually there is a key big difference between `killExecutors` and
`decommissionExecutors`: The former does the actual killing and the latter does
'graceful death': Graceful death waits for all the migrations etc to happen and
then the executor exits. I think this may lead to an overcommit: The old
executor is still being decommissioned and migrating things, and a new one
could be spun up. The two executors are alive at the same time (perhaps on the
same node). Would this cause issues ?
This problem does not exist for `killExecutors` so much: Kill executor will
kill the executor soon enough and thus the overlap window is smaller.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
+ executorsToDecommission.foreach { case (exec, _) =>
+ val rpId = withLock {
+ executorDataMap(exec).resourceProfileId
+ }
+ val rp =
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+ if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+ // Assume that we are killing an executor that was started by
default and
+ // not through the request api
+ requestedTotalExecutorsPerResourceProfile(rp) = 0
+ } else {
+ val requestedTotalForRp =
requestedTotalExecutorsPerResourceProfile(rp)
+ requestedTotalExecutorsPerResourceProfile(rp) =
math.max(requestedTotalForRp - 1, 0)
+ }
+ }
+ doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+ }
+
+ val decommissioned = executorsToDecommission.filter { case (executorId,
decomInfo) =>
+ doDecommission(executorId, decomInfo)
+ }.map(_._1)
+ decommissioned
+ }
+
+
+ private def doDecommission(executorId: String,
Review comment:
Why is this duplicating the `decommissionExecutor` call above ? I think
you are decommissioning twice now: This method is called synchronously by the
ExecutorAllocationManager: It sends a message `DecommissionExecutor` to itself,
which then calls `decommissionExecutor` on line 425 in this file. This does not
smell right to me: both the code duplication and also the double
decommissioning.
I think you want to simply send the message here and let the DriverEndpoint
work do the actual decommissioning as before. Ie. only do this:
`driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))`
[Blocker]
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be >
0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
- if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+ // If dynamic allocation shuffle tracking or worker decommissioning
along with
+ // storage shuffle decommissioning is enabled we have *experimental*
support for
+ // decommissioning without a shuffle service.
+ if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+ (conf.get(WORKER_DECOMMISSION_ENABLED) &&
+ conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
Review comment:
Okay. I understand the intention now. Nevermind :-)
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
val removed = executors.remove(event.executorId)
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
- if (!removed.pendingRemoval) {
+ if (!removed.pendingRemoval || !removed.decommissioning) {
Review comment:
Thanks for clarifying that !
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
Review comment:
Okay, I understand this better now. `adjustTargetNumExecutors=true`
means that the scheduler will not try to replenish the executor. Otherwise, the
scheduler still thinks that the application wants the same number of executors
as before.
I would strongly recommend that you please consider extracting out this
piece of common code (shared with `killExecutors`) into a separate helper
method. Its fairly subtle and having it one place will both help the
readability of this PR and future changes.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- execs.foreach(execId => sched.decommissionExecutor(execId,
ExecutorDecommissionInfo("", false)))
+ // Make the executors decommission, finish, exit, and not be replaced.
+ val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false)))
+ sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors =
true)
Review comment:
I did reread the code and now I am sure I understand it: I think
`adjustTargetNumExecutors` flag should continue to remain false here, so that
it matches the old semantics. This code above does not need to match
"killExecutors" implementation because it is testing decommissioning and not
killing. It didn't even do killing before.
Before this PR: decommissioning wouldn't change the requested executors.
They remain whatever the test application requested previously. This should
continue to hold: This test does not use dynamic allocation and I don't see a
need to change this test behavior.
Typically you don't want to change test behavior unless you are changing the
product code associated with it too. Tests typically exist for catching
regression and their expectations shouldn't just change in the middle for no
reason.
I would consider this a soft blocker.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]