Github user liyinan926 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21366#discussion_r190354568
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue,
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+ conf: SparkConf,
+ executorBuilder: KubernetesExecutorBuilder,
+ kubernetesClient: KubernetesClient,
+ eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+ import ExecutorPodsEventHandler._
+
+ private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+ private val totalExpectedExecutors = new AtomicInteger(0)
+
+ private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+ private val podAllocationSize =
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+ private val podAllocationDelay =
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+ private val kubernetesDriverPodName = conf
+ .get(KUBERNETES_DRIVER_POD_NAME)
+ .getOrElse(throw new SparkException("Must specify the driver pod
name"))
+
+ private val driverPod = kubernetesClient.pods()
+ .withName(kubernetesDriverPodName)
+ .get()
+
+ // Use sets of ids instead of counters to be able to handle duplicate
events.
+
+ // Executor IDs that have been requested from Kubernetes but are not
running yet.
+ private val pendingExecutors = mutable.Set.empty[Long]
+
+ // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors
here for tallying the
+ // executors that are running. But, here we choose instead to maintain
all state within this
+ // class from the persecptive of the k8s API. Therefore whether or not
this scheduler loop
+ // believes an executor is running is dictated by the K8s API rather
than Spark's RPC events.
+ // We may need to consider where these perspectives may differ and which
perspective should
+ // take precedence.
+ private val runningExecutors = mutable.Set.empty[Long]
+
+ private var eventProcessorFuture: Future[_] = _
+
+ def start(applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend): Unit = {
+ require(eventProcessorFuture == null, "Cannot start event processing
twice.")
+ logInfo(s"Starting Kubernetes executor pods event handler for
application with" +
+ s" id $applicationId.")
+ val eventProcessor = new Runnable {
+ override def run(): Unit = {
+ Utils.tryLogNonFatalError {
+ processEvents(applicationId, schedulerBackend)
+ }
+ }
+ }
+ eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+ eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+ }
+
+ def stop(): Unit = {
+ if (eventProcessorFuture != null) {
+ eventProcessorFuture.cancel(true)
+ eventProcessorFuture = null
+ }
+ }
+
+ private def processEvents(
+ applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend) {
+ val currentEvents = new
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+ eventQueue.drainTo(currentEvents)
+ currentEvents.asScala.flatten.foreach { updatedPod =>
+ val execId =
updatedPod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong
+ val podPhase = updatedPod.getStatus.getPhase.toLowerCase
+ if (isDeleted(updatedPod)) {
+ removeExecutorFromSpark(schedulerBackend, updatedPod, execId)
+ } else {
+ updatedPod.getStatus.getPhase.toLowerCase match {
+ case "running" =>
+ // If clause is for resililence to out of order operations -
executor must be
+ // pending and first reach running. Without this check you may
e.g. process a
+ // deletion event followed by some arbitrary modification
event - we want the
+ // deletion event to "stick".
+ if (pendingExecutors.contains(execId)) {
+ pendingExecutors.remove(execId)
+ runningExecutors.add(execId)
+ }
+ // TODO (SPARK-24135) - handle more classes of errors
+ case "error" | "failed" | "succeeded" =>
+ // If deletion failed on a previous try, we can try again if
resync informs us the pod
+ // is still around.
+ // Delete as best attempt - duplicate deletes will throw an
exception but the end state
+ // of getting rid of the pod is what matters.
+ if (!isDeleted(updatedPod)) {
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .withName(updatedPod.getMetadata.getName)
+ .delete()
+ }
+ }
+ removeExecutorFromSpark(schedulerBackend, updatedPod, execId)
+ }
+ }
+ }
+
+ val currentRunningExecutors = runningExecutors.size
--- End diff --
I think you can move the code for creating new executors to its own
executor service that can actually run concurrently with the executor service
for processing pod events. This way you can use independent schedules for the
two. I think this is less confusing and allows pod event processing to run with
a higher frequency than pod allocation.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]