agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r467485361
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
val removed = executors.remove(event.executorId)
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
- if (!removed.pendingRemoval) {
+ if (!removed.pendingRemoval || !removed.decommissioning) {
Review comment:
I am just trying to follow along this code, so pardon me if this is a
n00b question: Why are we separately tracking pendingRemoval and
decommissioning separately ? Two questions about that:
- If an executor is marked as decommissioned here, when is it actually
removed ? (Outside of dynamic allocation that happens when the executor
naturally has a heartbeat failure. ).
- Is my understanding correct that if graceful decommissioning is plugged
into dynamic-allocation (this feature) AND the cluster manager supports
decommissioning, then pendingRemoval would be empty -- ie executors would only
be decommission ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor(
//
// This means that an executor may be marked as having shuffle data, and
thus prevented
// from being removed, even though the data may not be used.
+ // TODO: Only track used files (SPARK-31974)
Review comment:
Is this comment change intended ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
val removed = executors.remove(event.executorId)
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
- if (!removed.pendingRemoval) {
+ if (!removed.pendingRemoval || !removed.decommissioning) {
nextTimeout.set(Long.MinValue)
}
}
}
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
- if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
- return
- }
val exec =
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
UNKNOWN_RESOURCE_PROFILE_ID)
+
+ // Check if it is a shuffle file, or RDD to pick the correct codepath for
update
+ if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] &&
shuffleTrackingEnabled) {
+ /**
+ * The executor monitor keeps track of locations of cache and shuffle
blocks and this can be
+ * used to decide which executor(s) Spark should shutdown first. Since
we move shuffle blocks
+ * around now this wires it up so that it keeps track of it. We only do
this for data blocks
+ * as index and other blocks blocks do not necessarily mean the entire
block has been
+ * committed.
+ */
+ event.blockUpdatedInfo.blockId match {
+ case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
+ case _ => // For now we only update on data blocks
+ }
+ return
+ } else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
Review comment:
just a code nit to avoid two returns's:
Would it make sense to put this shuffle block check inside the if-branch of
not-instanceof-RDDblockId ? like:
```
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] &&
shuffleTrackingEnabled) {
....
}
return
}
```
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio
+ /**
+ * Request that the cluster manager decommission the specified executors.
+ *
+ * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+ * @param adjustTargetNumExecutors whether the target number of executors
will be adjusted down
+ * after these executors have been
decommissioned.
+ * @return the ids of the executors acknowledged by the cluster manager to
be removed.
+ */
+ override def decommissionExecutors(
+ executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+ adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+ val executorsToDecommission = executorsAndDecomInfo.filter { case
(executorId, _) =>
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ // If we don't want to replace the executors we are decommissioning
+ if (adjustTargetNumExecutors) {
Review comment:
Is the comment above accurate ? It seems we are indeed replacing the
executors that are decommissioned when adjustTargetNumExecutors = true.
On a related note, should `adjustTargetNumExecutors` be simply renamed as
`replaceDecommissionedExecutors` ? to make the meaning be more direct ?
##########
File path:
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
##########
@@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager(
logDebug(s"Removable executors (${removableExecIds.size}):
${removableExecIds}")
if (removableExecIds.nonEmpty) {
val execIdToRemove =
removableExecIds(Random.nextInt(removableExecIds.size))
- client.killExecutor(execIdToRemove)
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+ client.decommissionExecutor(execIdToRemove,
+ ExecutorDecommissionInfo("spark scale down", false),
+ adjustTargetNumExecutors = true)
Review comment:
I feel that this is the ONLY place where adjustTargetNumExecutors should
be set to true.
##########
File path:
streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
##########
@@ -83,12 +96,26 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
Map.empty)}
}
- /** Verify that a particular executor was killed */
+ /** Verify that a particular executor was scaled down. */
def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {
Review comment:
Do you want to rename the method to verifyScaledDownExec ? to make it
match the comment change ?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with
LocalSparkContext {
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- execs.foreach(execId => sched.decommissionExecutor(execId,
ExecutorDecommissionInfo("", false)))
+ // Make the executors decommission, finish, exit, and not be replaced.
+ val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false)))
+ sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors =
true)
Review comment:
I am confused here: `adjustTargetNumExecutors = true` means that the
executor should be replaced (IIUC). Whereas the comment above says "not be
replaced".
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
logInfo(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(
execToDecommission,
- ExecutorDecommissionInfo("", isHostDecommissioned = false))
+ ExecutorDecommissionInfo("", isHostDecommissioned = false),
+ adjustTargetNumExecutors = true)
Review comment:
Why is this true and not false ? We explicitly want to kill and discard
the executor here without replacing it. Although the test does not truly care,
but still why the change ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]