mridulm commented on code in PR #38441:
URL: https://github.com/apache/spark/pull/38441#discussion_r1020853081
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2193,9 +2193,8 @@ private[spark] class DAGScheduler(
* Return true when:
* 1. Waiting for decommission start
* 2. Under decommission process
- * Return false when:
- * 1. Stopped or terminated after finishing decommission
- * 2. Under decommission process, then removed by driver with other reasons
+ * 3. Stopped or terminated after finishing decommission
+ * 4. Under decommission process, then removed by driver with other reasons
*/
Review Comment:
Reformulate the comment to reflect the recent changes ?
If decommissioned, will return `true` only until it expires.
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -1117,7 +1124,14 @@ private[spark] class TaskSchedulerImpl(
}
}
- executorsPendingDecommission.remove(executorId)
+ val removedDecomState = executorsPendingDecommission.remove(executorId)
+ removedDecomState.foreach(
+ s => {
+ removedExecutorsDueToDecom.put(executorId, s)
+ if (removedExecutorsDueToDecom.size >= REMOVED_EXECUTORS_MAX_SIZE) {
+ removedExecutorsDueToDecom.remove(removedExecutorsDueToDecom.head._1)
+ }
+ })
Review Comment:
```suggestion
executorsPendingDecommission.remove(executorId).foreach(removedExecutorsDueToDecom.put(executorId,
_))
```
Change to the Cache (above) will take care of eviction, etc.
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -145,6 +145,13 @@ private[spark] class TaskSchedulerImpl(
// continue to run even after being asked to decommission, but they will
eventually exit.
val executorsPendingDecommission = new HashMap[String,
ExecutorDecommissionState]
+ // Keep removed executors due to decommission, so
getExecutorDecommissionState
+ // still return correct value even after executor is lost
+ val removedExecutorsDueToDecom = new mutable.LinkedHashMap[String,
ExecutorDecommissionState]
Review Comment:
Instead of manually enforcing this manually, why not use CacheBuilder ?
Something like:
`CacheBuilder.newBuilder().maximumSize(10000).expireAfterAccess(1,
TimeUnit.MINUTES).build()`
(Use suitable constants above)
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2004,7 +2004,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
// executor1 is eventually lost
scheduler.executorLost("executor1", ExecutorExited(0, false, "normal"))
- assert(scheduler.executorsPendingDecommission.isEmpty)
+ // [SPARK-40481] after executor lost, decommission state still be kept
+ assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
Review Comment:
Also validate the cleanup behavior.
For example, set size to 2 and decom 3 executors to validate first got
evicted - ensures size is bounded as code evovles.
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -145,6 +145,13 @@ private[spark] class TaskSchedulerImpl(
// continue to run even after being asked to decommission, but they will
eventually exit.
val executorsPendingDecommission = new HashMap[String,
ExecutorDecommissionState]
+ // Keep removed executors due to decommission, so
getExecutorDecommissionState
+ // still return correct value even after executor is lost
+ val removedExecutorsDueToDecom = new mutable.LinkedHashMap[String,
ExecutorDecommissionState]
+
+ // Max size to keep removed executors due to decommission
+ val REMOVED_EXECUTORS_MAX_SIZE = 10000
Review Comment:
Make this configurable, via an internal config - with default as 10k as here.
The total cluster size need not be large, but the number of executors can be
higher (dynamic resource allocation, resource ask + size of node for rolling
upgrades, spot instances coming in and out, etc).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]