Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21067#discussion_r190974019
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingJobStatusWatcher.scala
 ---
    @@ -16,55 +16,45 @@
      */
     package org.apache.spark.deploy.k8s.submit
     
    -import java.util.concurrent.{CountDownLatch, TimeUnit}
    +import java.util.concurrent.CountDownLatch
     
     import scala.collection.JavaConverters._
     
    -import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
    -import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
    +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Job, Pod, 
Time}
    +import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
     import io.fabric8.kubernetes.client.Watcher.Action
     
     import org.apache.spark.SparkException
     import org.apache.spark.internal.Logging
    -import org.apache.spark.util.ThreadUtils
     
    -private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
    +private[k8s] trait LoggingJobStatusWatcher extends Watcher[Job] {
       def awaitCompletion(): Unit
     }
     
    -/**
    - * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
    - * every state change and also at an interval for liveness.
    - *
    - * @param appId application ID.
    - * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
    - *                             number.
    - */
    -private[k8s] class LoggingPodStatusWatcherImpl(
    -    appId: String,
    -    maybeLoggingInterval: Option[Long])
    -  extends LoggingPodStatusWatcher with Logging {
    -
    -  private val podCompletedFuture = new CountDownLatch(1)
    -  // start timer for periodic logging
    -  private val scheduler =
    -    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
    -  private val logRunnable: Runnable = new Runnable {
    -    override def run() = logShortStatus()
    -  }
    -
    -  private var pod = Option.empty[Pod]
    -
    -  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
    -
    -  def start(): Unit = {
    -    maybeLoggingInterval.foreach { interval =>
    -      scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
    + /**
    +  * A monitor for the running Kubernetes pod of a Spark application. 
Status logging occurs on
    +  * every state change and also at an interval for liveness.
    +  *
    +  * @param appId application ID.
    +  * @param kubernetesClient kubernetes client.
    +  */
    +private[k8s] class LoggingJobStatusWatcherImpl(
    +                                                appId: String,
    --- End diff --
    
    Indentation


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to