mridulm commented on code in PR #38441:
URL: https://github.com/apache/spark/pull/38441#discussion_r1020853081


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2193,9 +2193,8 @@ private[spark] class DAGScheduler(
    * Return true when:
    *  1. Waiting for decommission start
    *  2. Under decommission process
-   * Return false when:
-   *  1. Stopped or terminated after finishing decommission
-   *  2. Under decommission process, then removed by driver with other reasons
+   *  3. Stopped or terminated after finishing decommission
+   *  4. Under decommission process, then removed by driver with other reasons
    */

Review Comment:
   Reformulate the comment to reflect the recent changes ?
   If decommissioned, will return `true` only until it expires.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -1117,7 +1124,14 @@ private[spark] class TaskSchedulerImpl(
       }
     }
 
-    executorsPendingDecommission.remove(executorId)
+    val removedDecomState = executorsPendingDecommission.remove(executorId)
+    removedDecomState.foreach(
+      s => {
+        removedExecutorsDueToDecom.put(executorId, s)
+        if (removedExecutorsDueToDecom.size >= REMOVED_EXECUTORS_MAX_SIZE) {
+          removedExecutorsDueToDecom.remove(removedExecutorsDueToDecom.head._1)
+        }
+      })

Review Comment:
   ```suggestion
       
executorsPendingDecommission.remove(executorId).foreach(removedExecutorsDueToDecom.put(executorId,
 _))
   ```
   
   Change to the Cache (above) will take care of eviction, etc.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -145,6 +145,13 @@ private[spark] class TaskSchedulerImpl(
   // continue to run even after being asked to decommission, but they will 
eventually exit.
   val executorsPendingDecommission = new HashMap[String, 
ExecutorDecommissionState]
 
+  // Keep removed executors due to decommission, so 
getExecutorDecommissionState
+  // still return correct value even after executor is lost
+  val removedExecutorsDueToDecom = new mutable.LinkedHashMap[String, 
ExecutorDecommissionState]

Review Comment:
   Instead of manually enforcing this manually, why not use CacheBuilder ?
   Something like:
   `CacheBuilder.newBuilder().maximumSize(10000).expireAfterAccess(1, 
TimeUnit.MINUTES).build()`
   
   (Use suitable constants above)



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2004,7 +2004,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
 
     // executor1 is eventually lost
     scheduler.executorLost("executor1", ExecutorExited(0, false, "normal"))
-    assert(scheduler.executorsPendingDecommission.isEmpty)
+    // [SPARK-40481] after executor lost, decommission state still be kept
+    assert(scheduler.getExecutorDecommissionState("executor1").isDefined)

Review Comment:
   Also validate the cleanup behavior.
   For example, set size to 2 and decom 3 executors to validate first got 
evicted - ensures size is bounded as code evovles.
   



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -145,6 +145,13 @@ private[spark] class TaskSchedulerImpl(
   // continue to run even after being asked to decommission, but they will 
eventually exit.
   val executorsPendingDecommission = new HashMap[String, 
ExecutorDecommissionState]
 
+  // Keep removed executors due to decommission, so 
getExecutorDecommissionState
+  // still return correct value even after executor is lost
+  val removedExecutorsDueToDecom = new mutable.LinkedHashMap[String, 
ExecutorDecommissionState]
+
+  // Max size to keep removed executors due to decommission
+  val REMOVED_EXECUTORS_MAX_SIZE = 10000

Review Comment:
   Make this configurable, via an internal config - with default as 10k as here.
   The total cluster size need not be large, but the number of executors can be 
higher (dynamic resource allocation, resource ask + size of node for rolling 
upgrades, spot instances coming in and out, etc).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to