Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21366#discussion_r190365332
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.scheduler.cluster.k8s
    +
    +import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
    +
    +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
    +import io.fabric8.kubernetes.client.KubernetesClient
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.spark.{SparkConf, SparkException}
    +import org.apache.spark.deploy.k8s.Config._
    +import org.apache.spark.deploy.k8s.Constants._
    +import org.apache.spark.deploy.k8s.KubernetesConf
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorExited
    +import org.apache.spark.util.Utils
    +
    +private[spark] class ExecutorPodsEventHandler(
    +    conf: SparkConf,
    +    executorBuilder: KubernetesExecutorBuilder,
    +    kubernetesClient: KubernetesClient,
    +    eventProcessorExecutor: ScheduledExecutorService) extends Logging {
    +
    +  import ExecutorPodsEventHandler._
    +
    +  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
    +
    +  private val totalExpectedExecutors = new AtomicInteger(0)
    +
    +  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
    +
    +  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
    +
    +  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
    +
    +  private val kubernetesDriverPodName = conf
    +    .get(KUBERNETES_DRIVER_POD_NAME)
    +    .getOrElse(throw new SparkException("Must specify the driver pod 
name"))
    +
    +  private val driverPod = kubernetesClient.pods()
    +    .withName(kubernetesDriverPodName)
    +    .get()
    +
    +  // Use sets of ids instead of counters to be able to handle duplicate 
events.
    +
    +  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
    +  private val pendingExecutors = mutable.Set.empty[Long]
    +
    +  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
    +  // executors that are running. But, here we choose instead to maintain 
all state within this
    +  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
    +  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
    +  // We may need to consider where these perspectives may differ and which 
perspective should
    +  // take precedence.
    +  private val runningExecutors = mutable.Set.empty[Long]
    +
    +  private var eventProcessorFuture: Future[_] = _
    +
    +  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
    +    require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
    +    logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
    +      s" id $applicationId.")
    +    val eventProcessor = new Runnable {
    +      override def run(): Unit = {
    +        Utils.tryLogNonFatalError {
    +          processEvents(applicationId, schedulerBackend)
    +        }
    +      }
    +    }
    +    eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
    +      eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
    +  }
    +
    +  def stop(): Unit = {
    +    if (eventProcessorFuture != null) {
    +      eventProcessorFuture.cancel(true)
    +      eventProcessorFuture = null
    +    }
    +  }
    +
    +  private def processEvents(
    +      applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
    +    val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
    +    eventQueue.drainTo(currentEvents)
    +    currentEvents.asScala.flatten.foreach { updatedPod =>
    +      val execId = 
updatedPod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong
    +      val podPhase = updatedPod.getStatus.getPhase.toLowerCase
    +      if (isDeleted(updatedPod)) {
    +        removeExecutorFromSpark(schedulerBackend, updatedPod, execId)
    +      } else {
    +        updatedPod.getStatus.getPhase.toLowerCase match {
    +          case "running" =>
    +            // If clause is for resililence to out of order operations - 
executor must be
    +            // pending and first reach running. Without this check you may 
e.g. process a
    +            // deletion event followed by some arbitrary modification 
event - we want the
    +            // deletion event to "stick".
    +            if (pendingExecutors.contains(execId)) {
    +              pendingExecutors.remove(execId)
    +              runningExecutors.add(execId)
    +            }
    +          // TODO (SPARK-24135) - handle more classes of errors
    +          case "error" | "failed" | "succeeded" =>
    +            // If deletion failed on a previous try, we can try again if 
resync informs us the pod
    +            // is still around.
    +            // Delete as best attempt - duplicate deletes will throw an 
exception but the end state
    +            // of getting rid of the pod is what matters.
    +            if (!isDeleted(updatedPod)) {
    +              Utils.tryLogNonFatalError {
    +                kubernetesClient
    +                  .pods()
    +                  .withName(updatedPod.getMetadata.getName)
    +                  .delete()
    +              }
    +            }
    +            removeExecutorFromSpark(schedulerBackend, updatedPod, execId)
    +        }
    +      }
    +    }
    +
    +    val currentRunningExecutors = runningExecutors.size
    --- End diff --
    
    I thought about this, and prefer the current way because of the ease of 
reasoning about synchronization. One of the downsides with the previous 
approach was the need to synchronize state and reason about thread safety in 
multiple places. With a separate executor service that reads/writes from the 
`pendingExecutorIds` and `runningExecutorIds` field, we'd have to reason about 
how the two executors would share about these data structures with the right 
thread safety model.
    
    But what do you think, would the concurrency complexity be manageable and 
worth the upside of having separate schedules?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to