Github user mccheah commented on a diff in the pull request:
https://github.com/apache/spark/pull/21067#discussion_r190974019
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingJobStatusWatcher.scala
---
@@ -16,55 +16,45 @@
*/
package org.apache.spark.deploy.k8s.submit
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
-import io.fabric8.kubernetes.api.model.{ContainerStateRunning,
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
-import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning,
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Job, Pod,
Time}
+import io.fabric8.kubernetes.client.{KubernetesClient,
KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
-import org.apache.spark.util.ThreadUtils
-private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+private[k8s] trait LoggingJobStatusWatcher extends Watcher[Job] {
def awaitCompletion(): Unit
}
-/**
- * A monitor for the running Kubernetes pod of a Spark application. Status
logging occurs on
- * every state change and also at an interval for liveness.
- *
- * @param appId application ID.
- * @param maybeLoggingInterval ms between each state request. If provided,
must be a positive
- * number.
- */
-private[k8s] class LoggingPodStatusWatcherImpl(
- appId: String,
- maybeLoggingInterval: Option[Long])
- extends LoggingPodStatusWatcher with Logging {
-
- private val podCompletedFuture = new CountDownLatch(1)
- // start timer for periodic logging
- private val scheduler =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
- private val logRunnable: Runnable = new Runnable {
- override def run() = logShortStatus()
- }
-
- private var pod = Option.empty[Pod]
-
- private def phase: String =
pod.map(_.getStatus.getPhase).getOrElse("unknown")
-
- def start(): Unit = {
- maybeLoggingInterval.foreach { interval =>
- scheduler.scheduleAtFixedRate(logRunnable, 0, interval,
TimeUnit.MILLISECONDS)
+ /**
+ * A monitor for the running Kubernetes pod of a Spark application.
Status logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param kubernetesClient kubernetes client.
+ */
+private[k8s] class LoggingJobStatusWatcherImpl(
+ appId: String,
--- End diff --
Indentation
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]