Github user dvogelbacher commented on a diff in the pull request:
https://github.com/apache/spark/pull/21366#discussion_r194177422
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
---
@@ -49,48 +54,58 @@ private[spark] class ExecutorPodsAllocator(
.withName(kubernetesDriverPodName)
.get()
- // Use sets of ids instead of counters to be able to handle duplicate
events.
-
- // Executor IDs that have been requested from Kubernetes but are not
running yet.
- private val pendingExecutors = mutable.Set.empty[Long]
-
- // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors
here for tallying the
- // executors that are running. But, here we choose instead to maintain
all state within this
- // class from the persecptive of the k8s API. Therefore whether or not
this scheduler loop
- // believes an executor is running is dictated by the K8s API rather
than Spark's RPC events.
- // We may need to consider where these perspectives may differ and which
perspective should
- // take precedence.
- private val runningExecutors = mutable.Set.empty[Long]
+ // Executor IDs that have been requested from Kubernetes but have not
been detected in any
+ // snapshot yet. Mapped to the timestamp when they were created.
+ private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
def start(applicationId: String): Unit = {
- eventQueue.addSubscriber(
- podAllocationDelay,
- new ExecutorPodBatchSubscriber(
- processUpdatedPod(applicationId),
- () => postProcessBatch(applicationId)))
+ snapshotsStore.addSubscriber(podAllocationDelay) {
+ processSnapshot(applicationId, _)
+ }
}
def setTotalExpectedExecutors(total: Int): Unit =
totalExpectedExecutors.set(total)
- private def processUpdatedPod(applicationId: String):
PartialFunction[ExecutorPodState, Unit] = {
- case running @ PodRunning(_) =>
- pendingExecutors -= running.execId()
- runningExecutors += running.execId()
- case completed @ (PodSucceeded(_) | PodDeleted(_) | PodFailed(_)) =>
- pendingExecutors -= completed.execId()
- runningExecutors -= completed.execId()
- case _ =>
- }
+ private def processSnapshot(applicationId: String, snapshot:
ExecutorPodsSnapshot): Unit = {
+ snapshot.executorPods.keys.foreach { newlyCreatedExecutors -= _ }
+
+ // For all executors we've created against the API but have not seen
in a snapshot
+ // yet - check the current time. If the current time has exceeded some
threshold,
+ // assume that the pod was either never created (the API server never
properly
+ // handled the creation request), or the API server created the pod
but we missed
+ // both the creation and deletion events. In either case, delete the
missing pod
+ // if possible, and mark such a pod to be rescheduled below.
+ (newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet).foreach
{ execId =>
--- End diff --
`newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet` is
unnecessary as you remove all snapshot pods from `newlyCreatedExecutors` in the
line above
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]