squito commented on a change in pull request #25236: [SPARK-28487][k8s] More 
responsive dynamic allocation with K8S.
URL: https://github.com/apache/spark/pull/25236#discussion_r309730279
 
 

 ##########
 File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ##########
 @@ -66,97 +66,167 @@ private[spark] class ExecutorPodsAllocator(
   // snapshot yet. Mapped to the timestamp when they were created.
   private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
 
+  private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)
+
+  private val hasPendingPods = new AtomicBoolean()
+
+  private var lastSnapshot = ExecutorPodsSnapshot(Nil)
+
   def start(applicationId: String): Unit = {
     snapshotsStore.addSubscriber(podAllocationDelay) {
       onNewSnapshots(applicationId, _)
     }
   }
 
-  def setTotalExpectedExecutors(total: Int): Unit = 
totalExpectedExecutors.set(total)
+  def setTotalExpectedExecutors(total: Int): Unit = {
+    totalExpectedExecutors.set(total)
+    if (!hasPendingPods.get()) {
+      snapshotsStore.notifySubscribers()
+    }
+  }
 
-  private def onNewSnapshots(applicationId: String, snapshots: 
Seq[ExecutorPodsSnapshot]): Unit = {
+  private def onNewSnapshots(
+      applicationId: String,
+      snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
     newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
     // For all executors we've created against the API but have not seen in a 
snapshot
     // yet - check the current time. If the current time has exceeded some 
threshold,
     // assume that the pod was either never created (the API server never 
properly
     // handled the creation request), or the API server created the pod but we 
missed
     // both the creation and deletion events. In either case, delete the 
missing pod
     // if possible, and mark such a pod to be rescheduled below.
-    newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
-      val currentTime = clock.getTimeMillis()
+    val currentTime = clock.getTimeMillis()
+    val timedOut = newlyCreatedExecutors.flatMap { case (execId, timeCreated) 
=>
       if (currentTime - timeCreated > podCreationTimeout) {
-        logWarning(s"Executor with id $execId was not detected in the 
Kubernetes" +
-          s" cluster after $podCreationTimeout milliseconds despite the fact 
that a" +
-          " previous allocation attempt tried to create it. The executor may 
have been" +
-          " deleted but the application missed the deletion event.")
-
-        if (shouldDeleteExecutors) {
-          Utils.tryLogNonFatalError {
-            kubernetesClient
-              .pods()
-              .withLabel(SPARK_APP_ID_LABEL, applicationId)
-              .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
-              .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
-              .delete()
-          }
-        }
-        newlyCreatedExecutors -= execId
+        Some(execId)
       } else {
         logDebug(s"Executor with id $execId was not found in the Kubernetes 
cluster since it" +
           s" was created ${currentTime - timeCreated} milliseconds ago.")
+        None
+      }
+    }
+
+    if (timedOut.nonEmpty) {
+      logWarning(s"Executors with ids ${timedOut.mkString(",")} were not 
detected in the" +
+        s" Kubernetes cluster after $podCreationTimeout ms despite the fact 
that a previous" +
+        " allocation attempt tried to create them. The executors may have been 
deleted but the" +
+        " application missed the deletion event.")
+
+      newlyCreatedExecutors --= timedOut
+      if (shouldDeleteExecutors) {
+        Utils.tryLogNonFatalError {
+          kubernetesClient
+            .pods()
+            .withLabel(SPARK_APP_ID_LABEL, applicationId)
+            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, 
timedOut.toSeq.map(_.toString): _*)
+            .delete()
+        }
       }
     }
 
     if (snapshots.nonEmpty) {
-      // Only need to examine the cluster as of the latest snapshot, the 
"current" state, to see if
-      // we need to allocate more executors or not.
-      val latestSnapshot = snapshots.last
-      val currentRunningExecutors = latestSnapshot.executorPods.values.count {
-        case PodRunning(_) => true
+      lastSnapshot = snapshots.last
+    }
+
+    val currentRunningExecutors = lastSnapshot.executorPods.values.count {
 
 Review comment:
   minor: on a quick read, the naming of these variables is a bit confusing on 
whether its a list of execs or just a count -- would be nice to have the counts 
consistently use `...Count` or `num...`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to