Github user dvogelbacher commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21366#discussion_r194177422
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ---
    @@ -49,48 +54,58 @@ private[spark] class ExecutorPodsAllocator(
         .withName(kubernetesDriverPodName)
         .get()
     
    -  // Use sets of ids instead of counters to be able to handle duplicate 
events.
    -
    -  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
    -  private val pendingExecutors = mutable.Set.empty[Long]
    -
    -  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
    -  // executors that are running. But, here we choose instead to maintain 
all state within this
    -  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
    -  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
    -  // We may need to consider where these perspectives may differ and which 
perspective should
    -  // take precedence.
    -  private val runningExecutors = mutable.Set.empty[Long]
    +  // Executor IDs that have been requested from Kubernetes but have not 
been detected in any
    +  // snapshot yet. Mapped to the timestamp when they were created.
    +  private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
     
       def start(applicationId: String): Unit = {
    -    eventQueue.addSubscriber(
    -      podAllocationDelay,
    -      new ExecutorPodBatchSubscriber(
    -        processUpdatedPod(applicationId),
    -        () => postProcessBatch(applicationId)))
    +    snapshotsStore.addSubscriber(podAllocationDelay) {
    +      processSnapshot(applicationId, _)
    +    }
       }
     
       def setTotalExpectedExecutors(total: Int): Unit = 
totalExpectedExecutors.set(total)
     
    -  private def processUpdatedPod(applicationId: String): 
PartialFunction[ExecutorPodState, Unit] = {
    -    case running @ PodRunning(_) =>
    -      pendingExecutors -= running.execId()
    -      runningExecutors += running.execId()
    -    case completed @ (PodSucceeded(_) | PodDeleted(_) | PodFailed(_)) =>
    -      pendingExecutors -= completed.execId()
    -      runningExecutors -= completed.execId()
    -    case _ =>
    -  }
    +  private def processSnapshot(applicationId: String, snapshot: 
ExecutorPodsSnapshot): Unit = {
    +    snapshot.executorPods.keys.foreach { newlyCreatedExecutors -= _ }
    +
    +    // For all executors we've created against the API but have not seen 
in a snapshot
    +    // yet - check the current time. If the current time has exceeded some 
threshold,
    +    // assume that the pod was either never created (the API server never 
properly
    +    // handled the creation request), or the API server created the pod 
but we missed
    +    // both the creation and deletion events. In either case, delete the 
missing pod
    +    // if possible, and mark such a pod to be rescheduled below.
    +    (newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet).foreach 
{ execId =>
    --- End diff --
    
    `newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet` is 
unnecessary as you remove all snapshot pods from `newlyCreatedExecutors` in the 
line above


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to