[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168364882
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -51,33 +52,16 @@ import org.apache.spark.util._
 /**
  * Common application master functionality for Spark on Yarn.
  */
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments) 
extends Logging {
+private[spark] class ApplicationMaster(args: ApplicationMasterArguments, 
sparkConf: SparkConf,
--- End diff --

I made changes to the default constructor and added another constructor. 
Please check and let me know anything can be done better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168364718
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+.getIdentifier().array(), token.getPassword().array, new Text(
+token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+System.setProperty(
+  ApplicationConstants.Environment.CONTAINER_ID.name(),
+  ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 
1).toString)
+val amArgs = new ApplicationMasterArguments(Array("--arg",
--- End diff --

I added another constructor without `ApplicationMasterArguments` and takes 
`RpcEnv`  to use the same instance in AM. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168364491
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+.getIdentifier().array(), token.getPassword().array, new Text(
+token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+System.setProperty(
--- End diff --

I changed to set in sparkConf and use the same in ApplicationMaster while 
getting the containerId.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168364257
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -784,6 +794,9 @@ private[spark] class Client(
 val env = new HashMap[String, String]()
 populateClasspath(args, hadoopConf, sparkConf, env, 
sparkConf.get(DRIVER_CLASS_PATH))
 env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
+if (isClientUnmanagedAMEnabled) {
+  System.setProperty("SPARK_YARN_STAGING_DIR", stagingDirPath.toString)
--- End diff --

Changed it to get from the spark conf and the application id.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168363855
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+.getIdentifier().array(), token.getPassword().array, new Text(
+token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+System.setProperty(
+  ApplicationConstants.Environment.CONTAINER_ID.name(),
+  ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 
1).toString)
+val amArgs = new ApplicationMasterArguments(Array("--arg",
+  sparkConf.get("spark.driver.host") + ":" + 
sparkConf.get("spark.driver.port")))
+// Start Application Service in a separate thread and continue with 
application monitoring
+new Thread() {
--- End diff --

changed it as daemon thread.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168363726
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
--- End diff --

report.getAMRMToken gives org.apache.hadoop.yarn.api.records.Token type 
instance, but currentUGI.addToken expects 
org.apache.hadoop.security.token.Token type instance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168363672
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -656,7 +664,9 @@ private[spark] class Client(
 // Clear the cache-related entries from the configuration to avoid 
them polluting the
 // UI's environment page. This works for client mode; for cluster 
mode, this is handled
 // by the AM.
-CACHE_CONFIGS.foreach(sparkConf.remove)
+if (!isClientUnmanagedAMEnabled) {
--- End diff --

It is clearing the classpath entries and leading to this error in Executors.
```
Error: Could not find or load main class 
org.apache.spark.executor.CoarseGrainedExecutorBackend
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-14 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r168363561
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -69,6 +70,10 @@ private[spark] class Client(
 
   private val isClusterMode = sparkConf.get("spark.submit.deployMode", 
"client") == "cluster"
 
+  private val isClientUnmanagedAMEnabled =
+sparkConf.getBoolean("spark.yarn.un-managed-am", false) && 
!isClusterMode
--- End diff --

Updated the config name and also added config constants.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165518653
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+.getIdentifier().array(), token.getPassword().array, new Text(
+token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+System.setProperty(
+  ApplicationConstants.Environment.CONTAINER_ID.name(),
+  ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 
1).toString)
+val amArgs = new ApplicationMasterArguments(Array("--arg",
--- End diff --

This is pretty weird, I'd make this an explicit constructor argument for 
the AM instead. But if I understand this correctly, this is the address the AM 
will be connecting back to the driver, right? 

It seems like there's an opportunity for better code here, since now they'd 
both be running in the same process. Like in the cluster mode case, where the 
AM uses the same `RpcEnv` instance as the driver (see `runDriver()`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165516803
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -51,33 +52,16 @@ import org.apache.spark.util._
 /**
  * Common application master functionality for Spark on Yarn.
  */
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments) 
extends Logging {
+private[spark] class ApplicationMaster(args: ApplicationMasterArguments, 
sparkConf: SparkConf,
--- End diff --

This doesn't follow Spark's convention for multi-line arguments.

This also looks a little odd now, because there are conflicting arguments. 
`ApplicationMasterArguments` is now only used in cluster mode, and everything 
else is expected to be provided in the other parameters. So while this is the 
simpler change, it's also a little ugly.

I don't really have a good suggestion right now, but it's something to 
think about.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165516090
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -784,6 +794,9 @@ private[spark] class Client(
 val env = new HashMap[String, String]()
 populateClasspath(args, hadoopConf, sparkConf, env, 
sparkConf.get(DRIVER_CLASS_PATH))
 env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
+if (isClientUnmanagedAMEnabled) {
+  System.setProperty("SPARK_YARN_STAGING_DIR", stagingDirPath.toString)
--- End diff --

Can this be propagated some other way? Using system properties is kinda 
hacky, and makes it dangerous to run another Spark app later in the same JVM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165516542
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+.getIdentifier().array(), token.getPassword().array, new Text(
+token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+System.setProperty(
--- End diff --

Same question about using system properties.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165515886
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -656,7 +664,9 @@ private[spark] class Client(
 // Clear the cache-related entries from the configuration to avoid 
them polluting the
 // UI's environment page. This works for client mode; for cluster 
mode, this is handled
 // by the AM.
-CACHE_CONFIGS.foreach(sparkConf.remove)
+if (!isClientUnmanagedAMEnabled) {
--- End diff --

Why is this needed in the new mode?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165518845
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+.getIdentifier().array(), token.getPassword().array, new Text(
+token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+System.setProperty(
+  ApplicationConstants.Environment.CONTAINER_ID.name(),
+  ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 
1).toString)
+val amArgs = new ApplicationMasterArguments(Array("--arg",
+  sparkConf.get("spark.driver.host") + ":" + 
sparkConf.get("spark.driver.port")))
+// Start Application Service in a separate thread and continue with 
application monitoring
+new Thread() {
--- End diff --

Don't you want to keep a reference to this thread and join it at some 
point, to make sure it really goes away? Should it be a daemon thread instead?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165515603
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -69,6 +70,10 @@ private[spark] class Client(
 
   private val isClusterMode = sparkConf.get("spark.submit.deployMode", 
"client") == "cluster"
 
+  private val isClientUnmanagedAMEnabled =
+sparkConf.getBoolean("spark.yarn.un-managed-am", false) && 
!isClusterMode
--- End diff --

This should be a config constant. Also `unmanagedAM` is more in line with 
other config names.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19616#discussion_r165516482
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1104,14 +1117,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return (state, report.getFinalApplicationStatus)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && 
isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: 
org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
--- End diff --

Why do you need to make this copy? Isn't the `Token` above enough?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org