tgravescs commented on a change in pull request #30204:
URL: https://github.com/apache/spark/pull/30204#discussion_r517708086



##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -147,136 +157,171 @@ private[spark] class ExecutorPodsAllocator(
       lastSnapshot = snapshots.last
     }
 
-    val currentRunningCount = lastSnapshot.executorPods.values.count {
-      case PodRunning(_) => true
-      case _ => false
-    }
-
-    val currentPendingExecutors = lastSnapshot.executorPods
-      .filter {
-        case (_, PodPending(_)) => true
-        case _ => false
-      }
-
     // Make a local, non-volatile copy of the reference since it's used 
multiple times. This
     // is the only method that modifies the list, so this is safe.
     var _deletedExecutorIds = deletedExecutorIds
-
     if (snapshots.nonEmpty) {
-      logDebug(s"Pod allocation status: $currentRunningCount running, " +
-        s"${currentPendingExecutors.size} pending, " +
-        s"${newlyCreatedExecutors.size} unacknowledged.")
-
       val existingExecs = lastSnapshot.executorPods.keySet
       _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
     }
 
-    val currentTotalExpectedExecutors = totalExpectedExecutors.get
-
-    // This variable is used later to print some debug logs. It's updated when 
cleaning up
-    // excess pod requests, since currentPendingExecutors is immutable.
-    var knownPendingCount = currentPendingExecutors.size
-
-    // It's possible that we have outstanding pods that are outdated when 
dynamic allocation
-    // decides to downscale the application. So check if we can release any 
pending pods early
-    // instead of waiting for them to time out. Drop them first from the 
unacknowledged list,
-    // then from the pending. However, in order to prevent too frequent 
frunctuation, newly
-    // requested pods are protected during executorIdleTimeout period.
-    //
-    // TODO: with dynamic allocation off, handle edge cases if we end up with 
more running
-    // executors than expected.
-    val knownPodCount = currentRunningCount + currentPendingExecutors.size +
-      newlyCreatedExecutors.size
-    if (knownPodCount > currentTotalExpectedExecutors) {
-      val excess = knownPodCount - currentTotalExpectedExecutors
-      val knownPendingToDelete = currentPendingExecutors
-        .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
-        .map { case (id, _) => id }
-        .take(excess - newlyCreatedExecutors.size)
-      val toDelete = newlyCreatedExecutors
-        .filter(x => currentTime - x._2 > executorIdleTimeout)
-        .keys.take(excess).toList ++ knownPendingToDelete
-
-      if (toDelete.nonEmpty) {
-        logInfo(s"Deleting ${toDelete.size} excess pod requests 
(${toDelete.mkString(",")}).")
-        _deletedExecutorIds = _deletedExecutorIds ++ toDelete
+    // Map the pods into per ResourceProfile id so we can check per 
ResourceProfile,
+    // add a fast path if not using other ResourceProfiles.
+    val rpIdToExecsAndPodState =
+      mutable.HashMap[Int, mutable.LinkedHashMap[Long, ExecutorPodState]]()

Review comment:
       no, I'll change it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to