Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19616#discussion_r165518845
--- Diff:
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return (state, report.getFinalApplicationStatus)
}
-
+ if (state == YarnApplicationState.ACCEPTED &&
isClientUnmanagedAMEnabled
+ && !amServiceStarted && report.getAMRMToken != null) {
+ amServiceStarted = true
+ startApplicationMasterService(report)
+ }
lastState = state
}
// Never reached, but keeps compiler happy
throw new SparkException("While loop is depleted! This should never
happen...")
}
+ private def startApplicationMasterService(report: ApplicationReport) = {
+ // Add AMRMToken to establish connection between RM and AM
+ val token = report.getAMRMToken
+ val amRMToken:
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+ new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+ .getIdentifier().array(), token.getPassword().array, new Text(
+ token.getKind()), new Text(token.getService()))
+ val currentUGI = UserGroupInformation.getCurrentUser
+ currentUGI.addToken(amRMToken)
+
+ System.setProperty(
+ ApplicationConstants.Environment.CONTAINER_ID.name(),
+ ContainerId.newContainerId(report.getCurrentApplicationAttemptId,
1).toString)
+ val amArgs = new ApplicationMasterArguments(Array("--arg",
+ sparkConf.get("spark.driver.host") + ":" +
sparkConf.get("spark.driver.port")))
+ // Start Application Service in a separate thread and continue with
application monitoring
+ new Thread() {
--- End diff --
Don't you want to keep a reference to this thread and join it at some
point, to make sure it really goes away? Should it be a daemon thread instead?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]