Github user dvogelbacher commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194177422 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -49,48 +54,58 @@ private[spark] class ExecutorPodsAllocator( .withName(kubernetesDriverPodName) .get() - // Use sets of ids instead of counters to be able to handle duplicate events. - - // Executor IDs that have been requested from Kubernetes but are not running yet. - private val pendingExecutors = mutable.Set.empty[Long] - - // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the - // executors that are running. But, here we choose instead to maintain all state within this - // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop - // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. - // We may need to consider where these perspectives may differ and which perspective should - // take precedence. - private val runningExecutors = mutable.Set.empty[Long] + // Executor IDs that have been requested from Kubernetes but have not been detected in any + // snapshot yet. Mapped to the timestamp when they were created. + private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] def start(applicationId: String): Unit = { - eventQueue.addSubscriber( - podAllocationDelay, - new ExecutorPodBatchSubscriber( - processUpdatedPod(applicationId), - () => postProcessBatch(applicationId))) + snapshotsStore.addSubscriber(podAllocationDelay) { + processSnapshot(applicationId, _) + } } def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) - private def processUpdatedPod(applicationId: String): PartialFunction[ExecutorPodState, Unit] = { - case running @ PodRunning(_) => - pendingExecutors -= running.execId() - runningExecutors += running.execId() - case completed @ (PodSucceeded(_) | PodDeleted(_) | PodFailed(_)) => - pendingExecutors -= completed.execId() - runningExecutors -= completed.execId() - case _ => - } + private def processSnapshot(applicationId: String, snapshot: ExecutorPodsSnapshot): Unit = { + snapshot.executorPods.keys.foreach { newlyCreatedExecutors -= _ } + + // For all executors we've created against the API but have not seen in a snapshot + // yet - check the current time. If the current time has exceeded some threshold, + // assume that the pod was either never created (the API server never properly + // handled the creation request), or the API server created the pod but we missed + // both the creation and deletion events. In either case, delete the missing pod + // if possible, and mark such a pod to be rescheduled below. + (newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet).foreach { execId => --- End diff -- `newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet` is unnecessary as you remove all snapshot pods from `newlyCreatedExecutors` in the line above
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org