This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 02c5b4f  [SPARK-28947][K8S] Status logging not happens at an interval 
for liveness
02c5b4f is described below

commit 02c5b4f76337cc3901b8741887292bb4478931f3
Author: Kent Yao <yaooq...@hotmail.com>
AuthorDate: Tue Oct 15 12:34:39 2019 -0700

    [SPARK-28947][K8S] Status logging not happens at an interval for liveness
    
    ### What changes were proposed in this pull request?
    
    This pr invoke the start method of `LoggingPodStatusWatcherImpl` for status 
logging at intervals.
    
    ### Why are the changes needed?
    
    This pr invoke the start method of `LoggingPodStatusWatcherImpl` is 
declared but never called
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    manually test
    
    Closes #25648 from yaooqinn/SPARK-28947.
    
    Authored-by: Kent Yao <yaooq...@hotmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../k8s/submit/KubernetesClientApplication.scala   | 25 ++-------
 .../k8s/submit/LoggingPodStatusWatcher.scala       | 61 ++++++++++------------
 .../spark/deploy/k8s/submit/ClientSuite.scala      |  5 +-
 3 files changed, 33 insertions(+), 58 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 11bbad9..8e5532d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -86,15 +86,12 @@ private[spark] object ClientArguments {
  * @param builder Responsible for building the base driver pod based on a 
composition of
  *                implemented features.
  * @param kubernetesClient the client to talk to the Kubernetes API server
- * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
- *                             to complete
  * @param watcher a watcher that monitors and logs the application status
  */
 private[spark] class Client(
     conf: KubernetesDriverConf,
     builder: KubernetesDriverBuilder,
     kubernetesClient: KubernetesClient,
-    waitForAppCompletion: Boolean,
     watcher: LoggingPodStatusWatcher) extends Logging {
 
   def run(): Unit = {
@@ -124,10 +121,11 @@ private[spark] class Client(
           .endVolume()
         .endSpec()
       .build()
+    val driverPodName = resolvedDriverPod.getMetadata.getName
     Utils.tryWithResource(
       kubernetesClient
         .pods()
-        .withName(resolvedDriverPod.getMetadata.getName)
+        .withName(driverPodName)
         .watch(watcher)) { _ =>
       val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
       try {
@@ -141,16 +139,8 @@ private[spark] class Client(
           throw e
       }
 
-      val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
-        s"${resolvedDriverPod.getMetadata.getName}"
-      if (waitForAppCompletion) {
-        logInfo(s"Waiting for application ${conf.appName} with submission ID 
${sId} to finish...")
-        watcher.awaitCompletion()
-        logInfo(s"Application ${conf.appName} with submission ID ${sId} 
finished.")
-      } else {
-        logInfo(s"Deployed Spark application ${conf.appName} with " +
-          s"submission ID ${sId} into Kubernetes.")
-      }
+      val sId = Seq(conf.namespace, driverPodName).mkString(":")
+      watcher.watchOrStop(sId)
     }
   }
 
@@ -199,13 +189,11 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
   }
 
   private def run(clientArguments: ClientArguments, sparkConf: SparkConf): 
Unit = {
-    val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
     // For constructing the app ID, we can't use the Spark application name, 
as the app ID is going
     // to be added as a label to group resources belonging to the same 
application. Label values are
     // considerably restrictive, e.g. must be no longer than 63 characters in 
length. So we generate
     // a unique app ID (captured by spark.app.id) in the format below.
     val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", 
"")}"
-    val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
     val kubernetesConf = KubernetesConf.createDriverConf(
       sparkConf,
       kubernetesAppId,
@@ -215,9 +203,7 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
     // The master URL has been checked for validity already in SparkSubmit.
     // We just need to get rid of the "k8s://" prefix here.
     val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
-    val loggingInterval = if (waitForAppCompletion) 
Some(sparkConf.get(REPORT_INTERVAL)) else None
-
-    val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, 
loggingInterval)
+    val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
 
     Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
       master,
@@ -231,7 +217,6 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
           kubernetesConf,
           new KubernetesDriverBuilder(),
           kubernetesClient,
-          waitForAppCompletion,
           watcher)
         client.run()
     }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index f16d1f3..ce3c80c 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -16,49 +16,36 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-
 import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
 import io.fabric8.kubernetes.client.Watcher.Action
 
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.KubernetesDriverConf
 import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.ThreadUtils
 
 private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
-  def awaitCompletion(): Unit
+  def watchOrStop(submissionId: String): Unit
 }
 
 /**
  * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
  * every state change and also at an interval for liveness.
  *
- * @param appId application ID.
- * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
- *                             number.
+ * @param conf kubernetes driver conf.
  */
-private[k8s] class LoggingPodStatusWatcherImpl(
-    appId: String,
-    maybeLoggingInterval: Option[Long])
+private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
   extends LoggingPodStatusWatcher with Logging {
 
-  private val podCompletedFuture = new CountDownLatch(1)
-  // start timer for periodic logging
-  private val scheduler =
-    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
-  private val logRunnable: Runnable = () => logShortStatus()
+  private val appId = conf.appId
+
+  private var podCompleted = false
 
   private var pod = Option.empty[Pod]
 
   private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
 
-  def start(): Unit = {
-    maybeLoggingInterval.foreach { interval =>
-      scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
-    }
-  }
-
   override def eventReceived(action: Action, pod: Pod): Unit = {
     this.pod = Option(pod)
     action match {
@@ -78,11 +65,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
     closeWatch()
   }
 
-  private def logShortStatus() = {
-    logInfo(s"Application status for $appId (phase: $phase)")
-  }
-
-  private def logLongStatus() = {
+  private def logLongStatus(): Unit = {
     logInfo("State changed, new state: " + 
pod.map(formatPodState).getOrElse("unknown"))
   }
 
@@ -90,15 +73,25 @@ private[k8s] class LoggingPodStatusWatcherImpl(
     phase == "Succeeded" || phase == "Failed"
   }
 
-  private def closeWatch(): Unit = {
-    podCompletedFuture.countDown()
-    scheduler.shutdown()
+  private def closeWatch(): Unit = synchronized {
+    podCompleted = true
+    this.notifyAll()
   }
 
-  override def awaitCompletion(): Unit = {
-    podCompletedFuture.await()
-    logInfo(pod.map { p =>
-      s"Container final statuses:\n\n${containersDescription(p)}"
-    }.getOrElse("No containers were found in the driver pod."))
+  override def watchOrStop(sId: String): Unit = if 
(conf.get(WAIT_FOR_APP_COMPLETION)) {
+    logInfo(s"Waiting for application ${conf.appName} with submission ID $sId 
to finish...")
+    val interval = conf.get(REPORT_INTERVAL)
+    synchronized {
+      while (!podCompleted) {
+        wait(interval)
+        logInfo(s"Application status for $appId (phase: $phase)")
+      }
+    }
+    logInfo(
+      pod.map { p => s"Container final 
statuses:\n\n${containersDescription(p)}" }
+        .getOrElse("No containers were found in the driver pod."))
+    logInfo(s"Application ${conf.appName} with submission ID $sId finished")
+  } else {
+    logInfo(s"Deployed Spark application ${conf.appName} with submission ID 
$sId into Kubernetes")
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 2cc7f8e..5d49ac0 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -146,7 +146,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
       kconf,
       driverBuilder,
       kubernetesClient,
-      false,
       loggingPodStatusWatcher)
     submissionClient.run()
     verify(podOperations).create(FULL_EXPECTED_POD)
@@ -157,7 +156,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
       kconf,
       driverBuilder,
       kubernetesClient,
-      false,
       loggingPodStatusWatcher)
     submissionClient.run()
     val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
@@ -181,9 +179,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
       kconf,
       driverBuilder,
       kubernetesClient,
-      true,
       loggingPodStatusWatcher)
     submissionClient.run()
-    verify(loggingPodStatusWatcher).awaitCompletion()
+    verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to