This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ba1badd2adf [SPARK-42813][K8S] Print application info when 
waitAppCompletion is false
ba1badd2adf is described below

commit ba1badd2adfd175c6680dff90e14b8aaa6cecd78
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Tue Mar 21 09:08:18 2023 -0700

    [SPARK-42813][K8S] Print application info when waitAppCompletion is false
    
    ### What changes were proposed in this pull request?
    
    On K8s cluster mode,
    
    1. when `spark.kubernetes.submission.waitAppCompletion=false`, print the 
application information on `spark-submit` exit, as it did before 
[SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174)
    
    2. add `appId` in the output message
    
    ### Why are the changes needed?
    
    On K8s cluster mode, when 
`spark.kubernetes.submission.waitAppCompletion=false`,
    before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), 
the `spark-submit` will exit quickly w/ the basic application information.
    ```
    logInfo(s"Deployed Spark application ${conf.appName} with submission ID 
$sId into Kubernetes")
    ```
    
    After [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), 
those part of code is unreachable, so nothing is output.
    
    This PR also proposes to add `appId` in the output message, to make it 
consistent w/ the context (if you look at the `LoggingPodStatusWatcherImpl`, 
this is kind of an exception, `... application $appId ...` is used in other 
places), and YARN.
    
    
https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1311-L1318
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, changes contain
    1) when `spark.kubernetes.submission.waitAppCompletion=false`, the user can 
see the app information when `spark-submit` exit.
    2) the end of `spark-submit` information contains app id now, which is 
consistent w/ the context and other resource managers like YARN.
    
    ### How was this patch tested?
    
    Pass CI.
    
    Closes #40444 from pan3793/SPARK-42813.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../k8s/submit/KubernetesClientApplication.scala   |  9 +++++-
 .../k8s/submit/LoggingPodStatusWatcher.scala       | 12 ++++----
 .../spark/deploy/k8s/submit/ClientSuite.scala      | 32 ++++++++++++++++++++--
 3 files changed, 43 insertions(+), 10 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 14d3c4d1f42..9f9b5655e26 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -180,8 +180,8 @@ private[spark] class Client(
         throw e
     }
 
+    val sId = Client.submissionId(conf.namespace, driverPodName)
     if (conf.get(WAIT_FOR_APP_COMPLETION)) {
-      val sId = Seq(conf.namespace, driverPodName).mkString(":")
       breakable {
         while (true) {
           val podWithName = kubernetesClient
@@ -202,10 +202,17 @@ private[spark] class Client(
           }
         }
       }
+    } else {
+      logInfo(s"Deployed Spark application ${conf.appName} with application ID 
${conf.appId} " +
+        s"and submission ID $sId into Kubernetes")
     }
   }
 }
 
+private[spark] object Client {
+  def submissionId(namespace: String, driverPodName: String): String = 
s"$namespace:$driverPodName"
+}
+
 /**
  * Main class and entry point of application submission in KUBERNETES mode.
  */
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 81d91457253..bc8b023b5ec 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -95,8 +95,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: 
KubernetesDriverConf)
     this.notifyAll()
   }
 
-  override def watchOrStop(sId: String): Boolean = if 
(conf.get(WAIT_FOR_APP_COMPLETION)) {
-    logInfo(s"Waiting for application ${conf.appName} with submission ID $sId 
to finish...")
+  override def watchOrStop(sId: String): Boolean = {
+    logInfo(s"Waiting for application ${conf.appName} with application ID 
${conf.appId} " +
+      s"and submission ID $sId to finish...")
     val interval = conf.get(REPORT_INTERVAL)
     synchronized {
       while (!podCompleted && !resourceTooOldReceived) {
@@ -109,12 +110,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: 
KubernetesDriverConf)
       logInfo(
         pod.map { p => s"Container final 
statuses:\n\n${containersDescription(p)}" }
           .getOrElse("No containers were found in the driver pod."))
-      logInfo(s"Application ${conf.appName} with submission ID $sId finished")
+      logInfo(s"Application ${conf.appName} with application ID ${conf.appId} 
" +
+        s"and submission ID $sId finished")
     }
     podCompleted
-  } else {
-    logInfo(s"Deployed Spark application ${conf.appName} with submission ID 
$sId into Kubernetes")
-    // Always act like the application has completed since we don't want to 
wait for app completion
-    true
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index a8c25ab5002..8c2be6c142d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -33,8 +33,10 @@ import org.scalatestplus.mockito.MockitoSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{Config, _}
+import org.apache.spark.deploy.k8s.Config.WAIT_FOR_APP_COMPLETION
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.deploy.k8s.submit.Client.submissionId
 import org.apache.spark.util.Utils
 
 class ClientSuite extends SparkFunSuite with BeforeAndAfter {
@@ -181,7 +183,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
     when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods)
     when(namedPods.create()).thenReturn(podWithOwnerReference())
     when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
-    when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + 
POD_NAME)).thenReturn(true)
+    val sId = submissionId(kconf.namespace, POD_NAME)
+    when(loggingPodStatusWatcher.watchOrStop(sId)).thenReturn(true)
     doReturn(resourceList)
       .when(kubernetesClient)
       .resourceList(createdResourcesArgumentCaptor.capture())
@@ -343,6 +346,31 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
       kubernetesClient,
       loggingPodStatusWatcher)
     submissionClient.run()
-    verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver")
+    verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, 
POD_NAME))
+  }
+
+  test("SPARK-42813: Print application info when waitAppCompletion is false") {
+    val appName = "SPARK-42813"
+    val logAppender = new LogAppender
+    withLogAppender(logAppender) {
+      val sparkConf = new SparkConf(loadDefaults = false)
+        .set("spark.app.name", appName)
+        .set(WAIT_FOR_APP_COMPLETION, false)
+      kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf,
+        resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
+      when(driverBuilder.buildFromFeatures(kconf, kubernetesClient))
+        .thenReturn(BUILT_KUBERNETES_SPEC)
+      val submissionClient = new Client(
+        kconf,
+        driverBuilder,
+        kubernetesClient,
+        loggingPodStatusWatcher)
+      submissionClient.run()
+    }
+    val appId = KubernetesTestConf.APP_ID
+    val sId = submissionId(kconf.namespace, POD_NAME)
+    
assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
+      s"Deployed Spark application $appName with application ID $appId " +
+      s"and submission ID $sId into Kubernetes"))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to