srowen commented on a change in pull request #28258:
URL: https://github.com/apache/spark/pull/28258#discussion_r416043932



##########
File path: core/src/main/scala/org/apache/spark/deploy/Client.scala
##########
@@ -124,38 +127,57 @@ private class ClientEndpoint(
     }
   }
 
-  /* Find out driver status then exit the JVM */
+  /**
+   * Find out driver status then exit the JVM. If the waitAppCompletion is set 
to true, monitors
+   * the application until it finishes, fails or is killed.
+   */
   def pollAndReportStatus(driverId: String): Unit = {
     // Since ClientEndpoint is the only RpcEndpoint in the process, blocking 
the event loop thread
     // is fine.
     logInfo("... waiting before polling master for driver state")
     Thread.sleep(5000)
     logInfo("... polling master for driver state")
-    val statusResponse =
-      
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
-    if (statusResponse.found) {
-      logInfo(s"State of $driverId is ${statusResponse.state.get}")
-      // Worker node, if present
-      (statusResponse.workerId, statusResponse.workerHostPort, 
statusResponse.state) match {
-        case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
-          logInfo(s"Driver running on $hostPort ($id)")
-        case _ =>
-      }
-      // Exception, if present
-      statusResponse.exception match {
-        case Some(e) =>
-          logError(s"Exception from cluster was: $e")
-          e.printStackTrace()
-          System.exit(-1)
-        case _ =>
-          System.exit(0)
+    while (true) {
+      val statusResponse =
+        
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+      if (statusResponse.found) {
+        logInfo(s"State of $driverId is ${statusResponse.state.get}")
+        // Worker node, if present
+        (statusResponse.workerId, statusResponse.workerHostPort, 
statusResponse.state) match {
+          case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
+            logInfo(s"Driver running on $hostPort ($id)")
+          case _ =>
+        }
+        // Exception, if present
+        statusResponse.exception match {
+          case Some(e) =>
+            logError(s"Exception from cluster was: $e")
+            e.printStackTrace()
+            System.exit(-1)
+          case _ =>
+            if (!waitAppCompletion) {
+              logInfo(s"No exception found and waitAppCompletion is false, " +
+                s"exiting spark-submit JVM.")
+              System.exit(0)
+            } else if (statusResponse.state.get == DriverState.FINISHED ||

Review comment:
       Just use a match statement to simplify the next 10 lines or so

##########
File path: core/src/main/scala/org/apache/spark/deploy/Client.scala
##########
@@ -124,38 +127,57 @@ private class ClientEndpoint(
     }
   }
 
-  /* Find out driver status then exit the JVM */
+  /**
+   * Find out driver status then exit the JVM. If the waitAppCompletion is set 
to true, monitors
+   * the application until it finishes, fails or is killed.
+   */
   def pollAndReportStatus(driverId: String): Unit = {
     // Since ClientEndpoint is the only RpcEndpoint in the process, blocking 
the event loop thread
     // is fine.
     logInfo("... waiting before polling master for driver state")
     Thread.sleep(5000)
     logInfo("... polling master for driver state")
-    val statusResponse =
-      
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
-    if (statusResponse.found) {
-      logInfo(s"State of $driverId is ${statusResponse.state.get}")
-      // Worker node, if present
-      (statusResponse.workerId, statusResponse.workerHostPort, 
statusResponse.state) match {
-        case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
-          logInfo(s"Driver running on $hostPort ($id)")
-        case _ =>
-      }
-      // Exception, if present
-      statusResponse.exception match {
-        case Some(e) =>
-          logError(s"Exception from cluster was: $e")
-          e.printStackTrace()
-          System.exit(-1)
-        case _ =>
-          System.exit(0)
+    while (true) {
+      val statusResponse =
+        
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+      if (statusResponse.found) {
+        logInfo(s"State of $driverId is ${statusResponse.state.get}")
+        // Worker node, if present
+        (statusResponse.workerId, statusResponse.workerHostPort, 
statusResponse.state) match {
+          case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
+            logInfo(s"Driver running on $hostPort ($id)")
+          case _ =>
+        }
+        // Exception, if present
+        statusResponse.exception match {
+          case Some(e) =>
+            logError(s"Exception from cluster was: $e")
+            e.printStackTrace()
+            System.exit(-1)
+          case _ =>
+            if (!waitAppCompletion) {
+              logInfo(s"No exception found and waitAppCompletion is false, " +
+                s"exiting spark-submit JVM.")
+              System.exit(0)
+            } else if (statusResponse.state.get == DriverState.FINISHED ||
+              statusResponse.state.get == DriverState.FAILED ||
+              statusResponse.state.get == DriverState.ERROR ||
+              statusResponse.state.get == DriverState.KILLED) {
+              logInfo(s"waitAppCompletion is true, state is 
${statusResponse.state.get}, " +
+                s"exiting spark-submit JVM.")
+              System.exit(0)
+            } else {
+              logTrace(s"waitAppCompletion is true, state is 
${statusResponse.state.get}," +

Review comment:
       I don't know if you need to say waitAppCompletion in all of these; 
that's not even the property name

##########
File path: docs/spark-standalone.md
##########
@@ -240,6 +240,16 @@ SPARK_MASTER_OPTS supports the following system properties:
   </td>
   <td>1.6.3</td>
 </tr>
+<tr>
+  <td><code>spark.submit.waitAppCompletion</code></td>
+  <td><code>true</code></td>

Review comment:
       Default is false right?

##########
File path: docs/spark-standalone.md
##########
@@ -240,6 +240,16 @@ SPARK_MASTER_OPTS supports the following system properties:
   </td>
   <td>1.6.3</td>
 </tr>
+<tr>
+  <td><code>spark.submit.waitAppCompletion</code></td>

Review comment:
       I think this is the wrong place; this shows properties you can set on 
the master, but this is an app setting. Right? or it should be?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to