[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r433591599 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,58 +138,84 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ - def pollAndReportStatus(driverId: String): Unit = { -// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread -// is fine. -logInfo("... waiting before polling master for driver state") -Thread.sleep(5000) -logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => + private def monitorDriverStatus(): Unit = { +if (submittedDriverID != "") { + asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID)) +} + } + + /** + * Processes and reports the driver status then exit the JVM if the + * waitAppCompletion is set to false, else reports the driver status + * if debug logs are enabled. + */ + def reportDriverStatus(found: Boolean, state: Option[DriverState], + workerId: Option[String], + workerHostPort: Option[String], + exception: Option[Exception]): Unit = { Review comment: nit: ```suggestion def reportDriverStatus( found: Boolean, state: Option[DriverState], workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception]): Unit = { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428729898 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +202,22 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => Review comment: oh, sorry miss that. yea, thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428704776 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +202,22 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => Review comment: we don't need to add `forwardMessageThread.scheduleAtFixedRate(...)` into any `case` branches but just put it as a global one(just do what you do now). I think it still works for `case "kill"`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428693738 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +202,22 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => Review comment: that's why I said we need to change the delay (e.g. 5s) instead of 0 for both submiting and killing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428693738 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +202,22 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => Review comment: that's why I said we need to change a the delay (e.g. 5s) instead of 0 for both submiting and killing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428661567 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +202,22 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => Review comment: > If we removing polling from that, what logic should be handled there? we use this: ``` forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { MonitorDriverStatus() }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS) ``` (but the initial delay need to change) in this way, submitting or killing drivers will still use it only for one time when `waitAppCompletion=false`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428645814 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +202,22 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => Review comment: It would be better if we could do some refactor on `pollAndReportStatus` in order to reduce some duplicate logic. For example, we can only call `pollAndReportStatus` here, and remove other invocations in `SubmitDriverResponse`/ `KillDriverResponse`. And, of course, the `pollAndReportStatus`(it also needs a new name) will not poll the status anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428643339 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -123,16 +134,24 @@ private class ClientEndpoint( }(forwardMessageExecutionContext) } } + private def MonitorDriverStatus(): Unit = { Review comment: nit: method should start with lowercase. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r428643084 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -123,16 +134,24 @@ private class ClientEndpoint( }(forwardMessageExecutionContext) } } + private def MonitorDriverStatus(): Unit = { Review comment: nit: need empty line above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r422744560 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +190,25 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => + if (found) { +state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => +logInfo(s"State of $submittedDriverID is ${state.get}, " + Review comment: nit: s"State of **driver** $submittedDriverID ..." ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +190,25 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => + if (found) { +state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => +logInfo(s"State of $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") +System.exit(0) + case _ => +Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) +logInfo(s"State of $submittedDriverID is ${state.get}, " + Review comment: nit: s"State of driver $submittedDriverID ..." ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +190,25 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => + if (found) { +state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => +logInfo(s"State of $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") +System.exit(0) + case _ => +Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) Review comment: `Thread.sleep` could still has the same issue, imaging the network drop happens during sleeping. We should control the period sending logic out of `receive`. We could mimic `CoarseGrainedSchedulerBackend` to do the same work here: https://github.com/apache/spark/blob/9faad07ce706890008a8a3ce675fa95b0bdf7c14/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L137-L139 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -176,6 +190,25 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + +case DriverStatusResponse(found, state, _, _, _) => + if (found) { +state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => +logInfo(s"State of $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") +System.exit(0) + case _ => +Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) +logInfo(s"State of $submittedDriverID is ${state.get}, " + Review comment: Since status polling will happen every second, I'm afraid logs can be too verbose. We can log it after a constant polling times, e.g. log every 60 times. ## File path: docs/spark-standalone.md ## @@ -374,6 +374,25 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. +# Client Properties + +Spark applications supports the following configuration properties specific to standalone mode: + + + Property NameDefault ValueMeaningSince Version + + spark.standalone.submit.waitAppCompletion + false + + In standalone cluster mode, controls whether the client waits to exit until the application completes. + If set to true, the client process will stay alive polling the application's status. Review comment: nit: `application's` or `driver`? ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -61,6 +61,10 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null + private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", Review comment: We usually use `ConfigEntry` for a new conf. Could you add it too? This is an automated message from the Apache Git Service. To respond to the messag
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r422503865 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => - } - // Exception, if present - statusResponse.exception match { -case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) -case _ => - System.exit(0) +while (true) { Review comment: > In both cases, I see the following in driver logs. I couldn't find any difference in logs. Hi @akshatb1 , logs are from `StandaloneAppClient$ClientEndpoint` and `StandaloneSchedulerBackend` rather than `org.apache.spark.deploy.ClientEndpoint`. Can you check again? > Just to confirm, are you suggesting to do this in lin # 180 in pollAndReportStatus method? Or should we handle this outside? I think just after line 180 should be ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r422503865 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => - } - // Exception, if present - statusResponse.exception match { -case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) -case _ => - System.exit(0) +while (true) { Review comment: > In both cases, I see the following in driver logs. I couldn't find any difference in logs. Hi @akshatb1 , logs are from `StandaloneAppClient$ClientEndpoint` and `StandaloneSchedulerBackend` rather than `org.apache.spark.deploy.ClientEndpoint`. Can you check again? > Just to confirm, are you suggesting to do this in lin # 180 in pollAndReportStatus method? Or should we handle this outside? I think just after line #180 should be ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r419487418 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => - } - // Exception, if present - statusResponse.exception match { -case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) -case _ => - System.exit(0) +while (true) { Review comment: A possible way to verify this is to launch a long running application and then shutdown Master at the middle and see whether `onDisconnected` is called. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r419487418 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => - } - // Exception, if present - statusResponse.exception match { -case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) -case _ => - System.exit(0) +while (true) { Review comment: A possible way to verify this is to launch a long running application and then shutdown Master at the middle and see whether `onDisconnected` is called immediately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r419486010 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => - } - // Exception, if present - statusResponse.exception match { -case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) -case _ => - System.exit(0) +while (true) { Review comment: We can periodically send a message (e.g. we can send it after ` Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) ` ) to `ClientEndpoint` itself to check driver's status. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r419464952 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => - } - // Exception, if present - statusResponse.exception match { -case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) -case _ => - System.exit(0) +while (true) { Review comment: Hey, please pay attention to my comment here. I believe the current implementation could block `ClientEndpoint` because it's a `ThreadSafeRpcEndpoint`. When enabling `waitAppCompletion`, `ClientEndpoint` would actually keep handling message `SubmitDriverResponse` until the application finished. So, `ClientEndpoint` is unable to handle other messages, e.g. `RemoteProcessDisconnected`, `RemoteProcessConnectionError`, at the same time, which breaks the current behaviour. Furthermore, it could also block messages from backup masters, though not fatal in this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28258: [SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode
Ngone51 commented on a change in pull request #28258: URL: https://github.com/apache/spark/pull/28258#discussion_r417057392 ## File path: core/src/main/scala/org/apache/spark/deploy/Client.scala ## @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") -val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) -if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { -case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") -case _ => - } - // Exception, if present - statusResponse.exception match { -case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) -case _ => - System.exit(0) +while (true) { Review comment: This could block `ClientEndpoint` when `waitAppCompletion=true`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org