[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168364882 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -51,33 +52,16 @@ import org.apache.spark.util._ /** * Common application master functionality for Spark on Yarn. */ -private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging { +private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkConf: SparkConf, --- End diff -- I made changes to the default constructor and added another constructor. Please check and let me know anything can be done better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168364718 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token +.getIdentifier().array(), token.getPassword().array, new Text( +token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +System.setProperty( + ApplicationConstants.Environment.CONTAINER_ID.name(), + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) +val amArgs = new ApplicationMasterArguments(Array("--arg", --- End diff -- I added another constructor without `ApplicationMasterArguments` and takes `RpcEnv` to use the same instance in AM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168364491 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token +.getIdentifier().array(), token.getPassword().array, new Text( +token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +System.setProperty( --- End diff -- I changed to set in sparkConf and use the same in ApplicationMaster while getting the containerId. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168364257 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -784,6 +794,9 @@ private[spark] class Client( val env = new HashMap[String, String]() populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString +if (isClientUnmanagedAMEnabled) { + System.setProperty("SPARK_YARN_STAGING_DIR", stagingDirPath.toString) --- End diff -- Changed it to get from the spark conf and the application id. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168363855 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token +.getIdentifier().array(), token.getPassword().array, new Text( +token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +System.setProperty( + ApplicationConstants.Environment.CONTAINER_ID.name(), + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) +val amArgs = new ApplicationMasterArguments(Array("--arg", + sparkConf.get("spark.driver.host") + ":" + sparkConf.get("spark.driver.port"))) +// Start Application Service in a separate thread and continue with application monitoring +new Thread() { --- End diff -- changed it as daemon thread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168363726 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = --- End diff -- report.getAMRMToken gives org.apache.hadoop.yarn.api.records.Token type instance, but currentUGI.addToken expects org.apache.hadoop.security.token.Token type instance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168363672 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -656,7 +664,9 @@ private[spark] class Client( // Clear the cache-related entries from the configuration to avoid them polluting the // UI's environment page. This works for client mode; for cluster mode, this is handled // by the AM. -CACHE_CONFIGS.foreach(sparkConf.remove) +if (!isClientUnmanagedAMEnabled) { --- End diff -- It is clearing the classpath entries and leading to this error in Executors. ``` Error: Could not find or load main class org.apache.spark.executor.CoarseGrainedExecutorBackend ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r168363561 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -69,6 +70,10 @@ private[spark] class Client( private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + private val isClientUnmanagedAMEnabled = +sparkConf.getBoolean("spark.yarn.un-managed-am", false) && !isClusterMode --- End diff -- Updated the config name and also added config constants. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165518653 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token +.getIdentifier().array(), token.getPassword().array, new Text( +token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +System.setProperty( + ApplicationConstants.Environment.CONTAINER_ID.name(), + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) +val amArgs = new ApplicationMasterArguments(Array("--arg", --- End diff -- This is pretty weird, I'd make this an explicit constructor argument for the AM instead. But if I understand this correctly, this is the address the AM will be connecting back to the driver, right? It seems like there's an opportunity for better code here, since now they'd both be running in the same process. Like in the cluster mode case, where the AM uses the same `RpcEnv` instance as the driver (see `runDriver()`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165516803 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -51,33 +52,16 @@ import org.apache.spark.util._ /** * Common application master functionality for Spark on Yarn. */ -private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging { +private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkConf: SparkConf, --- End diff -- This doesn't follow Spark's convention for multi-line arguments. This also looks a little odd now, because there are conflicting arguments. `ApplicationMasterArguments` is now only used in cluster mode, and everything else is expected to be provided in the other parameters. So while this is the simpler change, it's also a little ugly. I don't really have a good suggestion right now, but it's something to think about. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165516090 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -784,6 +794,9 @@ private[spark] class Client( val env = new HashMap[String, String]() populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString +if (isClientUnmanagedAMEnabled) { + System.setProperty("SPARK_YARN_STAGING_DIR", stagingDirPath.toString) --- End diff -- Can this be propagated some other way? Using system properties is kinda hacky, and makes it dangerous to run another Spark app later in the same JVM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165516542 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token +.getIdentifier().array(), token.getPassword().array, new Text( +token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +System.setProperty( --- End diff -- Same question about using system properties. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165515886 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -656,7 +664,9 @@ private[spark] class Client( // Clear the cache-related entries from the configuration to avoid them polluting the // UI's environment page. This works for client mode; for cluster mode, this is handled // by the AM. -CACHE_CONFIGS.foreach(sparkConf.remove) +if (!isClientUnmanagedAMEnabled) { --- End diff -- Why is this needed in the new mode? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165518845 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token +.getIdentifier().array(), token.getPassword().array, new Text( +token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +System.setProperty( + ApplicationConstants.Environment.CONTAINER_ID.name(), + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) +val amArgs = new ApplicationMasterArguments(Array("--arg", + sparkConf.get("spark.driver.host") + ":" + sparkConf.get("spark.driver.port"))) +// Start Application Service in a separate thread and continue with application monitoring +new Thread() { --- End diff -- Don't you want to keep a reference to this thread and join it at some point, to make sure it really goes away? Should it be a daemon thread instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165515603 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -69,6 +70,10 @@ private[spark] class Client( private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + private val isClientUnmanagedAMEnabled = +sparkConf.getBoolean("spark.yarn.un-managed-am", false) && !isClusterMode --- End diff -- This should be a config constant. Also `unmanagedAM` is more in line with other config names. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165516482 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = --- End diff -- Why do you need to make this copy? Isn't the `Token` above enough? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org