mridulm commented on a change in pull request #27313: [SPARK-29148][CORE] Add 
stage level scheduling dynamic allocation and scheduler backend changes
URL: https://github.com/apache/spark/pull/27313#discussion_r376760827
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 ##########
 @@ -423,21 +510,30 @@ private[spark] class ExecutorAllocationManager(
    */
   private def removeExecutors(executors: Seq[String]): Seq[String] = 
synchronized {
     val executorIdsToBeRemoved = new ArrayBuffer[String]
-
     logDebug(s"Request to remove executorIds: ${executors.mkString(", ")}")
-    val numExistingExecutors = executorMonitor.executorCount - 
executorMonitor.pendingRemovalCount
-
-    var newExecutorTotal = numExistingExecutors
+    val numExecutorsTotalPerRpId = mutable.Map[Int, Int]()
     executors.foreach { executorIdToBeRemoved =>
-      if (newExecutorTotal - 1 < minNumExecutors) {
-        logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there are only " +
-          s"$newExecutorTotal executor(s) left (minimum number of executor 
limit $minNumExecutors)")
-      } else if (newExecutorTotal - 1 < numExecutorsTarget) {
-        logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there are only " +
-          s"$newExecutorTotal executor(s) left (number of executor target 
$numExecutorsTarget)")
+      val rpId = getResourceProfileIdOfExecutor(executorIdToBeRemoved)
+      if (rpId == UNKNOWN_RESOURCE_PROFILE_ID) {
+        logWarning(s"Not removing executor $executorIdsToBeRemoved because 
couldn't find " +
+          "ResourceProfile for it!")
 
 Review comment:
   Essentially where ever we are introducing resource profile, a Set[E] becomes 
a Map[Int, Set[E]] and an Int becomes a Map[Int, Int]. Under normal 
circumstances, I do not expect these to be bad.
   But given that we dont have named resource profiles, I am not sure how it 
interacts with a loop. For example, in an ML loop :
   ```
   val inputRdd = prepare()
   while (condition) {
     val computeRdd = needGpuResources(inputRdd.map().foo.bar)
     ...
   }
   ```
   
   Here, if I understood, for each iteration we will create a new resource 
profile. Depending on number of iterations, we could end up with an increasing 
memory usage (degenerate case - if this is used in streaming).
   Note: this is not a regression - for default profile, there is no usage 
increase (other than a negligible increase).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to