[SPARK-3477] Clean up code in Yarn Client / ClientBase

This is part of a broader effort to clean up the Yarn integration code after 
#2020.

The high-level changes in this PR include:
- Removing duplicate code, especially across the alpha and stable APIs
- Simplify unnecessarily complex method signatures and hierarchies
- Rename unclear variable and method names
- Organize logging output produced when the user runs Spark on Yarn
- Extensively add documentation
- Privatize classes where possible

I have tested the stable API on a Hadoop 2.4 cluster. I tested submitting a jar 
that references classes in other jars in both client and cluster mode. I also 
made changes in the alpha API, though I do not have access to an alpha cluster. 
I have verified that it compiles, but it would be ideal if others can help test 
it.

For those interested in some examples in detail, please read on.

--------------------------------------------------------------------------------------------------------

***Appendix***

- The loop to `getApplicationReport` from the RM is duplicated in 4 places: in 
the stable `Client`, alpha `Client`, and twice in `YarnClientSchedulerBackend`. 
We should not have different loops for client and cluster deploy modes.
- There are many fragmented small helper methods that are only used once and 
should just be inlined. For instance, `ClientBase#getLocalPath` returns `null` 
on certain conditions, and its only caller `ClientBase#addFileToClasspath` 
checks whether the value returned is `null`. We could just have the caller 
check on that same condition to avoid passing `null`s around.
- In `YarnSparkHadoopUtil#addToEnvironment`, we take in an argument 
`classpathSeparator` that always has the same value upstream (i.e. 
`File.pathSeparator`). This argument is now removed from the signature and all 
callers of this method upstream.
- `ClientBase#copyRemoteFile` is now renamed to `copyFileToRemote`. It was 
unclear whether we are copying a remote file to our local file system, or 
copying a locally visible file to a remote file system. Also, even the content 
of the method has inaccurately named variables. We use `val remoteFs` to 
signify the file system of the locally visible file and `val fs` to signify the 
remote, destination file system. These are now renamed `srcFs` and `destFs` 
respectively.
- We currently log the AM container's environment and resource mappings 
directly as Scala collections. This is incredibly hard to read and probably too 
verbose for the average Spark user. In other modes (e.g. standalone), we also 
don't log the launch commands by default, so the logging level of these 
information is now set to `DEBUG`.
- None of these classes (`Client`, `ClientBase`, `YarnSparkHadoopUtil` etc.) is 
intended to be used by a Spark application (the user should go through Spark 
submit instead). At the very least they should be `private[spark]`.

Author: Andrew Or <andrewo...@gmail.com>

Closes #2350 from andrewor14/yarn-cleanup and squashes the following commits:

39e8c7b [Andrew Or] Address review comments
6619f9b [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
yarn-cleanup
2ca6d64 [Andrew Or] Improve logging in application monitor
a3b9693 [Andrew Or] Minor changes
7dd6298 [Andrew Or] Simplify ClientBase#monitorApplication
547487c [Andrew Or] Provide default values for null application report entries
a0ad1e9 [Andrew Or] Fix class not found error
1590141 [Andrew Or] Address review comments
45ccdea [Andrew Or] Remove usages of getAMMemory
d8e33b6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
yarn-cleanup
ed0b42d [Andrew Or] Fix alpha compilation error
c0587b4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
yarn-cleanup
6d74888 [Andrew Or] Minor comment changes
6573c1d [Andrew Or] Clean up, simplify and document code for setting classpaths
e4779b6 [Andrew Or] Clean up log messages + variable naming in ClientBase
8766d37 [Andrew Or] Heavily add documentation to Client* classes + various 
clean-ups
6c94d79 [Andrew Or] Various cleanups in ClientBase and ClientArguments
ef7069a [Andrew Or] Clean up YarnClientSchedulerBackend more
6de9072 [Andrew Or] Guard against potential NPE in debug logging mode
fabe4c4 [Andrew Or] Reuse more code in YarnClientSchedulerBackend
3f941dc [Andrew Or] First cut at simplifying the Client (stable and alpha)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4022dd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4022dd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4022dd5

Branch: refs/heads/master
Commit: c4022dd52b4827323ff956632dc7623f546da937
Parents: 14f8c34
Author: Andrew Or <andrewo...@gmail.com>
Authored: Tue Sep 23 11:20:52 2014 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Tue Sep 23 11:20:52 2014 -0500

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 145 ++--
 .../spark/deploy/yarn/ClientArguments.scala     |  67 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   | 682 +++++++++++--------
 .../yarn/ClientDistributedCacheManager.scala    |  97 ++-
 .../deploy/yarn/ExecutorRunnableUtil.scala      |  16 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  63 +-
 .../cluster/YarnClientSchedulerBackend.scala    | 145 ++--
 .../spark/deploy/yarn/ClientBaseSuite.scala     |  18 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 167 ++---
 9 files changed, 738 insertions(+), 662 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index aff9ab7..5a20532 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -23,13 +23,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.YarnClientImpl
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -37,7 +35,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
 /**
  * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's 
alpha API.
  */
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: 
SparkConf)
+private[spark] class Client(
+    val args: ClientArguments,
+    val hadoopConf: Configuration,
+    val sparkConf: SparkConf)
   extends YarnClientImpl with ClientBase with Logging {
 
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
@@ -45,112 +46,86 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
 
   def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
 
-  val args = clientArgs
-  val conf = hadoopConf
-  val sparkConf = spConf
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+  val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
 
+  /* 
-------------------------------------------------------------------------------------
 *
+   | The following methods have much in common in the stable and alpha 
versions of Client, |
+   | but cannot be implemented in the parent trait due to subtle API 
differences across    |
+   | hadoop versions.                                                          
            |
+   * 
-------------------------------------------------------------------------------------
 */
 
-  // for client user who want to monitor app status by itself.
-  def runApp() = {
-    validateArgs()
-
+  /** Submit an application running our ApplicationMaster to the 
ResourceManager. */
+  override def submitApplication(): ApplicationId = {
     init(yarnConf)
     start()
-    logClusterResourceDetails()
 
-    val newApp = super.getNewApplication()
-    val appId = newApp.getApplicationId()
+    logInfo("Requesting a new application from cluster with %d NodeManagers"
+      .format(getYarnClusterMetrics.getNumNodeManagers))
 
-    verifyClusterResources(newApp)
-    val appContext = createApplicationSubmissionContext(appId)
-    val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val env = setupLaunchEnv(localResources, appStagingDir)
-    val amContainer = createContainerLaunchContext(newApp, localResources, env)
+    // Get a new application from our RM
+    val newAppResponse = getNewApplication()
+    val appId = newAppResponse.getApplicationId()
 
-    val capability = 
Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    // Memory for the ApplicationMaster.
-    capability.setMemory(args.amMemory + memoryOverhead)
-    amContainer.setResource(capability)
+    // Verify whether the cluster has enough resources for our AM
+    verifyClusterResources(newAppResponse)
 
-    appContext.setQueue(args.amQueue)
-    appContext.setAMContainerSpec(amContainer)
-    
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+    // Set up the appropriate contexts to launch our AM
+    val containerContext = createContainerLaunchContext(newAppResponse)
+    val appContext = createApplicationSubmissionContext(appId, 
containerContext)
 
-    submitApp(appContext)
+    // Finally, submit and monitor the application
+    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+    submitApplication(appContext)
     appId
   }
 
-  def run() {
-    val appId = runApp()
-    monitorApplication(appId)
-  }
-
-  def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
-    logInfo("Got cluster metric info from ASM, numNodeManagers = " +
-      clusterMetrics.getNumNodeManagers)
+  /**
+   * Set up a context for launching our ApplicationMaster container.
+   * In the Yarn alpha API, the memory requirements of this container must be 
set in
+   * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
+   */
+  override def createContainerLaunchContext(newAppResponse: 
GetNewApplicationResponse)
+      : ContainerLaunchContext = {
+    val containerContext = super.createContainerLaunchContext(newAppResponse)
+    val capability = Records.newRecord(classOf[Resource])
+    capability.setMemory(args.amMemory + amMemoryOverhead)
+    containerContext.setResource(capability)
+    containerContext
   }
 
-
-  def createApplicationSubmissionContext(appId: ApplicationId): 
ApplicationSubmissionContext = {
-    logInfo("Setting up application submission context for ASM")
+  /** Set up the context for submitting our ApplicationMaster. */
+  def createApplicationSubmissionContext(
+      appId: ApplicationId,
+      containerContext: ContainerLaunchContext): ApplicationSubmissionContext 
= {
     val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
     appContext.setApplicationId(appId)
     appContext.setApplicationName(args.appName)
+    appContext.setQueue(args.amQueue)
+    appContext.setAMContainerSpec(containerContext)
+    appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
     appContext
   }
 
-  def setupSecurityToken(amContainer: ContainerLaunchContext) = {
-    // Setup security tokens.
+  /**
+   * Set up security tokens for launching our ApplicationMaster container.
+   * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the 
stable API.
+   */
+  override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = 
{
     val dob = new DataOutputBuffer()
     credentials.writeTokenStorageToStream(dob)
     amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
   }
 
-  def submitApp(appContext: ApplicationSubmissionContext) = {
-    // Submit the application to the applications manager.
-    logInfo("Submitting application to ASM")
-    super.submitApplication(appContext)
-  }
-
-  def monitorApplication(appId: ApplicationId): Boolean = {
-    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
-    while (true) {
-      Thread.sleep(interval)
-      val report = super.getApplicationReport(appId)
-
-      logInfo("Application report from ASM: \n" +
-        "\t application identifier: " + appId.toString() + "\n" +
-        "\t appId: " + appId.getId() + "\n" +
-        "\t clientToken: " + report.getClientToken() + "\n" +
-        "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
-        "\t appMasterHost: " + report.getHost() + "\n" +
-        "\t appQueue: " + report.getQueue() + "\n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
-        "\t distributedFinalState: " + report.getFinalApplicationStatus() + 
"\n" +
-        "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
-        "\t appUser: " + report.getUser()
-      )
-
-      val state = report.getYarnApplicationState()
-      if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        return true
-      }
-    }
-    true
-  }
+  /**
+   * Return the security token used by this client to communicate with the 
ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the 
stable API.
+   */
+  override def getClientToken(report: ApplicationReport): String =
+    Option(report.getClientToken).getOrElse("")
 }
 
 object Client {
-
   def main(argStrings: Array[String]) {
     if (!sys.props.contains("SPARK_SUBMIT")) {
       println("WARNING: This client is deprecated and will be removed in a " +
@@ -158,19 +133,17 @@ object Client {
     }
 
     // Set an env variable indicating we are running in YARN mode.
-    // Note that anything with SPARK prefix gets propagated to all (remote) 
processes
+    // Note that any env variable with the SPARK_ prefix gets propagated to 
all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
-
     val sparkConf = new SparkConf
 
     try {
       val args = new ClientArguments(argStrings, sparkConf)
       new Client(args, sparkConf).run()
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         Console.err.println(e.getMessage)
         System.exit(1)
-      }
     }
 
     System.exit(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 40d8d6d..201b742 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -17,15 +17,14 @@
 
 package org.apache.spark.deploy.yarn
 
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.InputFormatInfo
 import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are 
location aware !
-class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
+private[spark] class ClientArguments(args: Array[String], sparkConf: 
SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null
@@ -35,28 +34,56 @@ class ClientArguments(val args: Array[String], val 
sparkConf: SparkConf) {
   var executorMemory = 1024 // MB
   var executorCores = 1
   var numExecutors = 2
-  var amQueue = sparkConf.get("QUEUE", "default")
+  var amQueue = sparkConf.get("spark.yarn.queue", "default")
   var amMemory: Int = 512 // MB
   var appName: String = "Spark"
   var priority = 0
 
-  parseArgs(args.toList)
+  // Additional memory to allocate to containers
+  // For now, use driver's memory overhead as our AM container's memory 
overhead
+  val amMemoryOverhead = sparkConf.getInt(
+    "spark.yarn.driver.memoryOverhead", 
YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+  val executorMemoryOverhead = sparkConf.getInt(
+    "spark.yarn.executor.memoryOverhead", 
YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
 
-  // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in 
yarn-client then
-  // it should default to hdfs://
-  files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
-  archives = 
Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
+  parseArgs(args.toList)
+  loadEnvironmentArgs()
+  validateArgs()
+
+  /** Load any default arguments provided through environment variables and 
Spark properties. */
+  private def loadEnvironmentArgs(): Unit = {
+    // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be 
resolved to hdfs://,
+    // while spark.yarn.dist.{archives/files} should be resolved to file:// 
(SPARK-2051).
+    files = Option(files)
+      .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
+      .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => 
Utils.resolveURIs(p)))
+      .orNull
+    archives = Option(archives)
+      .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
+      .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => 
Utils.resolveURIs(p)))
+      .orNull
+  }
 
-  // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if 
not specified,
-  // for both yarn-client and yarn-cluster
-  files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
-    map(p => Utils.resolveURIs(p)).orNull)
-  archives = 
Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
-    map(p => Utils.resolveURIs(p)).orNull)
+  /**
+   * Fail fast if any arguments provided are invalid.
+   * This is intended to be called only after the provided arguments have been 
parsed.
+   */
+  private def validateArgs(): Unit = {
+    // TODO: memory checks are outdated (SPARK-3476)
+    Map[Boolean, String](
+      (numExecutors <= 0) -> "You must specify at least 1 executor!",
+      (amMemory <= amMemoryOverhead) -> s"AM memory must be > 
$amMemoryOverhead MB",
+      (executorMemory <= executorMemoryOverhead) ->
+        s"Executor memory must be > $executorMemoryOverhead MB"
+    ).foreach { case (errorCondition, errorMessage) =>
+      if (errorCondition) {
+        throw new IllegalArgumentException(errorMessage + "\n" + 
getUsageMessage())
+      }
+    }
+  }
 
   private def parseArgs(inputArgs: List[String]): Unit = {
-    val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
-  
+    val userArgsBuffer = new ArrayBuffer[String]()
     var args = inputArgs
 
     while (!args.isEmpty) {
@@ -138,16 +165,14 @@ class ClientArguments(val args: Array[String], val 
sparkConf: SparkConf) {
     userArgs = userArgsBuffer.readOnly
   }
 
-
-  def getUsageMessage(unknownParam: Any = null): String = {
+  private def getUsageMessage(unknownParam: List[String] = null): String = {
     val message = if (unknownParam != null) s"Unknown/unsupported param 
$unknownParam\n" else ""
-
     message +
       "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
       "Options:\n" +
       "  --jar JAR_PATH             Path to your application's JAR file 
(required in yarn-cluster mode)\n" +
       "  --class CLASS_NAME         Name of your application's main class 
(required)\n" +
-      "  --arg ARGS                 Argument to be passed to your 
application's main class.\n" +
+      "  --arg ARG                  Argument to be passed to your 
application's main class.\n" +
       "                             Multiple invocations are possible, each 
will be passed in order.\n" +
       "  --num-executors NUM        Number of executors to start (Default: 
2)\n" +
       "  --executor-cores NUM       Number of cores for the executors 
(Default: 1).\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 6ae4d49..4870b0c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 
 import scala.collection.JavaConversions._
@@ -37,154 +36,107 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
+
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, 
SparkException}
 
 /**
- * The entry point (starting in Client#main() and Client#run()) for launching 
Spark on YARN. The
- * Client submits an application to the YARN ResourceManager.
+ * The entry point (starting in Client#main() and Client#run()) for launching 
Spark on YARN.
+ * The Client submits an application to the YARN ResourceManager.
  */
-trait ClientBase extends Logging {
-  val args: ClientArguments
-  val conf: Configuration
-  val sparkConf: SparkConf
-  val yarnConf: YarnConfiguration
-  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-  private val SPARK_STAGING: String = ".sparkStaging"
+private[spark] trait ClientBase extends Logging {
+  import ClientBase._
+
+  protected val args: ClientArguments
+  protected val hadoopConf: Configuration
+  protected val sparkConf: SparkConf
+  protected val yarnConf: YarnConfiguration
+  protected val credentials = 
UserGroupInformation.getCurrentUser.getCredentials
+  protected val amMemoryOverhead = args.amMemoryOverhead // MB
+  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
   private val distCacheMgr = new ClientDistributedCacheManager()
 
-  // Staging directory is private! -> rwx--------
-  val STAGING_DIR_PERMISSION: FsPermission =
-    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
-  // App files are world-wide readable and owner writable -> rw-r--r--
-  val APP_FILE_PERMISSION: FsPermission =
-    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
-
-  // Additional memory overhead - in mb.
-  protected def memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.driver.memoryOverhead",
-    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
-  // TODO(harvey): This could just go in ClientArguments.
-  def validateArgs() = {
-    Map(
-      (args.numExecutors <= 0) -> "Error: You must specify at least 1 
executor!",
-      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
-        "greater than: " + memoryOverhead),
-      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory 
size" +
-        "must be greater than: " + memoryOverhead.toString)
-    ).foreach { case(cond, errStr) =>
-      if (cond) {
-        logError(errStr)
-        throw new IllegalArgumentException(args.getUsageMessage())
-      }
-    }
-  }
-
-  def getAppStagingDir(appId: ApplicationId): String = {
-    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
-  }
-
-  def verifyClusterResources(app: GetNewApplicationResponse) = {
-    val maxMem = app.getMaximumResourceCapability().getMemory()
-    logInfo("Max mem capabililty of a single resource in this cluster " + 
maxMem)
-
-    // If we have requested more then the clusters max for a single resource 
then exit.
-    if (args.executorMemory > maxMem) {
-      val errorMessage =
-        "Required executor memory (%d MB), is above the max threshold (%d MB) 
of this cluster."
-          .format(args.executorMemory, maxMem)
-
-      logError(errorMessage)
-      throw new IllegalArgumentException(errorMessage)
-    }
-    val amMem = args.amMemory + memoryOverhead
+  /**
+   * Fail fast if we have requested more resources per container than is 
available in the cluster.
+   */
+  protected def verifyClusterResources(newAppResponse: 
GetNewApplicationResponse): Unit = {
+    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
+    logInfo("Verifying our application has not requested more than the maximum 
" +
+      s"memory capability of the cluster ($maxMem MB per container)")
+    val executorMem = args.executorMemory + executorMemoryOverhead
+    if (executorMem > maxMem) {
+      throw new IllegalArgumentException(s"Required executor memory 
($executorMem MB) " +
+        s"is above the max threshold ($maxMem MB) of this cluster!")
+    }
+    val amMem = args.amMemory + amMemoryOverhead
     if (amMem > maxMem) {
-
-      val errorMessage = "Required AM memory (%d) is above the max threshold 
(%d) of this cluster."
-        .format(amMem, maxMem)
-      logError(errorMessage)
-      throw new IllegalArgumentException(errorMessage)
+      throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
+        s"is above the max threshold ($maxMem MB) of this cluster!")
     }
-
     // We could add checks to make sure the entire cluster has enough 
resources but that involves
     // getting all the node reports and computing ourselves.
   }
 
-  /** See if two file systems are the same or not. */
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-    val srcUri = srcFs.getUri()
-    val dstUri = destFs.getUri()
-    if (srcUri.getScheme() == null) {
-      return false
-    }
-    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
-      return false
-    }
-    var srcHost = srcUri.getHost()
-    var dstHost = dstUri.getHost()
-    if ((srcHost != null) && (dstHost != null)) {
-      try {
-        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
-        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
-      } catch {
-        case e: UnknownHostException =>
-          return false
-      }
-      if (!srcHost.equals(dstHost)) {
-        return false
-      }
-    } else if (srcHost == null && dstHost != null) {
-      return false
-    } else if (srcHost != null && dstHost == null) {
-      return false
-    }
-    if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
-    }
-  }
-
-  /** Copy the file into HDFS if needed. */
-  private[yarn] def copyRemoteFile(
-      dstDir: Path,
-      originalPath: Path,
+  /**
+   * Copy the given file to a remote file system (e.g. HDFS) if needed.
+   * The file is only copied if the source and destination file systems are 
different. This is used
+   * for preparing resources for launching the ApplicationMaster container. 
Exposed for testing.
+   */
+  def copyFileToRemote(
+      destDir: Path,
+      srcPath: Path,
       replication: Short,
       setPerms: Boolean = false): Path = {
-    val fs = FileSystem.get(conf)
-    val remoteFs = originalPath.getFileSystem(conf)
-    var newPath = originalPath
-    if (!compareFs(remoteFs, fs)) {
-      newPath = new Path(dstDir, originalPath.getName())
-      logInfo("Uploading " + originalPath + " to " + newPath)
-      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
-      fs.setReplication(newPath, replication)
-      if (setPerms) fs.setPermission(newPath, new 
FsPermission(APP_FILE_PERMISSION))
+    val destFs = destDir.getFileSystem(hadoopConf)
+    val srcFs = srcPath.getFileSystem(hadoopConf)
+    var destPath = srcPath
+    if (!compareFs(srcFs, destFs)) {
+      destPath = new Path(destDir, srcPath.getName())
+      logInfo(s"Uploading resource $srcPath -> $destPath")
+      FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
+      destFs.setReplication(destPath, replication)
+      if (setPerms) {
+        destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
+      }
+    } else {
+      logInfo(s"Source and destination file systems are the same. Not copying 
$srcPath")
     }
     // Resolve any symlinks in the URI path so using a "current" symlink to 
point to a specific
     // version shows the specific version in the distributed cache 
configuration
-    val qualPath = fs.makeQualified(newPath)
-    val fc = FileContext.getFileContext(qualPath.toUri(), conf)
-    val destPath = fc.resolvePath(qualPath)
-    destPath
+    val qualifiedDestPath = destFs.makeQualified(destPath)
+    val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
+    fc.resolvePath(qualifiedDestPath)
   }
 
-  private def qualifyForLocal(localURI: URI): Path = {
-    var qualifiedURI = localURI
-    // If not specified, assume these are in the local filesystem to keep 
behavior like Hadoop
-    if (qualifiedURI.getScheme() == null) {
-      qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new 
Path(qualifiedURI)).toString)
-    }
+  /**
+   * Given a local URI, resolve it and return a qualified local path that 
corresponds to the URI.
+   * This is used for preparing local resources to be included in the 
container launch context.
+   */
+  private def getQualifiedLocalPath(localURI: URI): Path = {
+    val qualifiedURI =
+      if (localURI.getScheme == null) {
+        // If not specified, assume this is in the local filesystem to keep 
the behavior
+        // consistent with that of Hadoop
+        new URI(FileSystem.getLocal(hadoopConf).makeQualified(new 
Path(localURI)).toString)
+      } else {
+        localURI
+      }
     new Path(qualifiedURI)
   }
 
+  /**
+   * Upload any resources to the distributed cache if needed. If a resource is 
intended to be
+   * consumed locally, set up the appropriate config for downstream code to 
handle it properly.
+   * This is used for setting up a container launch context for our 
ApplicationMaster.
+   * Exposed for testing.
+   */
   def prepareLocalResources(appStagingDir: String): HashMap[String, 
LocalResource] = {
-    logInfo("Preparing Local resources")
-    // Upload Spark and the application JAR to the remote file system if 
necessary. Add them as
-    // local resources to the application master.
-    val fs = FileSystem.get(conf)
+    logInfo("Preparing resources for our AM container")
+    // Upload Spark and the application JAR to the remote file system if 
necessary,
+    // and add them as local resources to the application master.
+    val fs = FileSystem.get(hadoopConf)
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
-    ClientBase.obtainTokensForNamenodes(nns, conf, credentials)
+    val nns = getNameNodesToAccess(sparkConf) + dst
+    obtainTokensForNamenodes(nns, hadoopConf, credentials)
 
     val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 
3).toShort
     val localResources = HashMap[String, LocalResource]()
@@ -200,73 +152,84 @@ trait ClientBase extends Logging {
         "for alternatives.")
     }
 
+    /**
+     * Copy the given main resource to the distributed cache if the scheme is 
not "local".
+     * Otherwise, set the corresponding key in our SparkConf to handle it 
downstream.
+     * Each resource is represented by a 4-tuple of:
+     *   (1) destination resource name,
+     *   (2) local path to the resource,
+     *   (3) Spark property key to set if the scheme is not local, and
+     *   (4) whether to set permissions for this resource
+     */
     List(
-      (ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), 
ClientBase.CONF_SPARK_JAR),
-      (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR),
-      ("log4j.properties", oldLog4jConf.getOrElse(null), null)
-    ).foreach { case(destName, _localPath, confKey) =>
+      (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
+      (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
+      ("log4j.properties", oldLog4jConf.orNull, null, false)
+    ).foreach { case (destName, _localPath, confKey, setPermissions) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
-      if (! localPath.isEmpty()) {
+      if (!localPath.isEmpty()) {
         val localURI = new URI(localPath)
-        if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
-          val setPermissions = destName.equals(ClientBase.APP_JAR)
-          val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), 
replication, setPermissions)
-          val destFs = FileSystem.get(destPath.toUri(), conf)
-          distCacheMgr.addResource(destFs, conf, destPath, localResources, 
LocalResourceType.FILE,
-            destName, statCache)
+        if (localURI.getScheme != LOCAL_SCHEME) {
+          val src = getQualifiedLocalPath(localURI)
+          val destPath = copyFileToRemote(dst, src, replication, 
setPermissions)
+          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+          distCacheMgr.addResource(destFs, hadoopConf, destPath,
+            localResources, LocalResourceType.FILE, destName, statCache)
         } else if (confKey != null) {
+          // If the resource is intended for local use only, handle this 
downstream
+          // by setting the appropriate property
           sparkConf.set(confKey, localPath)
         }
       }
     }
 
+    /**
+     * Do the same for any additional resources passed in through 
ClientArguments.
+     * Each resource category is represented by a 3-tuple of:
+     *   (1) comma separated list of resources in this category,
+     *   (2) resource type, and
+     *   (3) whether to add these resources to the classpath
+     */
     val cachedSecondaryJarLinks = ListBuffer.empty[String]
-    val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
+    List(
+      (args.addJars, LocalResourceType.FILE, true),
       (args.files, LocalResourceType.FILE, false),
-      (args.archives, LocalResourceType.ARCHIVE, false) )
-    fileLists.foreach { case (flist, resType, addToClasspath) =>
+      (args.archives, LocalResourceType.ARCHIVE, false)
+    ).foreach { case (flist, resType, addToClasspath) =>
       if (flist != null && !flist.isEmpty()) {
-        flist.split(',').foreach { case file: String =>
+        flist.split(',').foreach { file =>
           val localURI = new URI(file.trim())
-          if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
+          if (localURI.getScheme != LOCAL_SCHEME) {
             val localPath = new Path(localURI)
             val linkname = 
Option(localURI.getFragment()).getOrElse(localPath.getName())
-            val destPath = copyRemoteFile(dst, localPath, replication)
-            distCacheMgr.addResource(fs, conf, destPath, localResources, 
resType,
-              linkname, statCache)
+            val destPath = copyFileToRemote(dst, localPath, replication)
+            distCacheMgr.addResource(
+              fs, hadoopConf, destPath, localResources, resType, linkname, 
statCache)
             if (addToClasspath) {
               cachedSecondaryJarLinks += linkname
             }
           } else if (addToClasspath) {
+            // Resource is intended for local use only and should be added to 
the class path
             cachedSecondaryJarLinks += file.trim()
           }
         }
       }
     }
-    logInfo("Prepared Local resources " + localResources)
-    sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, 
cachedSecondaryJarLinks.mkString(","))
+    if (cachedSecondaryJarLinks.nonEmpty) {
+      sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, 
cachedSecondaryJarLinks.mkString(","))
+    }
 
-    UserGroupInformation.getCurrentUser().addCredentials(credentials)
     localResources
   }
 
-  /** Get all application master environment variables set on this SparkConf */
-  def getAppMasterEnv: Seq[(String, String)] = {
-    val prefix = "spark.yarn.appMasterEnv."
-    sparkConf.getAll.filter{case (k, v) => k.startsWith(prefix)}
-      .map{case (k, v) => (k.substring(prefix.length), v)}
-  }
-
-
-  def setupLaunchEnv(
-      localResources: HashMap[String, LocalResource],
-      stagingDir: String): HashMap[String, String] = {
-    logInfo("Setting up the launch environment")
-
+  /**
+   * Set up the environment for launching our ApplicationMaster container.
+   */
+  private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+    logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
-
     val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
-    ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp)
+    populateClasspath(args, yarnConf, sparkConf, env, extraCp)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
@@ -275,42 +238,20 @@ trait ClientBase extends Logging {
     distCacheMgr.setDistFilesEnv(env)
     distCacheMgr.setDistArchivesEnv(env)
 
-    getAppMasterEnv.foreach { case (key, value) =>
-      YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
-    }
+    // Pick up any environment variables for the AM provided through 
spark.yarn.appMasterEnv.*
+    val amEnvPrefix = "spark.yarn.appMasterEnv."
+    sparkConf.getAll
+      .filter { case (k, v) => k.startsWith(amEnvPrefix) }
+      .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
+      .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, 
k, v) }
 
     // Keep this for backwards compatibility but users should move to the 
config
     sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
       // Allow users to specify some environment variables.
-      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs, 
File.pathSeparator)
-
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
       // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up 
executor environments.
       env("SPARK_YARN_USER_ENV") = userEnvs
     }
-    env
-  }
-
-  def userArgsToString(clientArgs: ClientArguments): String = {
-    val prefix = " --arg "
-    val args = clientArgs.userArgs
-    val retval = new StringBuilder()
-    for (arg <- args) {
-      retval.append(prefix).append(" 
").append(YarnSparkHadoopUtil.escapeForShell(arg))
-    }
-    retval.toString
-  }
-
-  def setupSecurityToken(amContainer: ContainerLaunchContext)
-
-  def createContainerLaunchContext(
-        newApp: GetNewApplicationResponse,
-        localResources: HashMap[String, LocalResource],
-        env: HashMap[String, String]): ContainerLaunchContext = {
-    logInfo("Setting up container launch context")
-    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
-    amContainer.setLocalResources(localResources)
-
-    val isLaunchingDriver = args.userClass != null
 
     // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to 
propagate it to
     // executors. But we can't just set spark.executor.extraJavaOptions, 
because the driver's
@@ -320,6 +261,7 @@ trait ClientBase extends Logging {
     // Note that to warn the user about the deprecation in cluster mode, some 
code from
     // SparkConf#validateSettings() is duplicated here (to avoid triggering 
the condition
     // described above).
+    val isLaunchingDriver = args.userClass != null
     if (isLaunchingDriver) {
       sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
         val warning =
@@ -342,14 +284,30 @@ trait ClientBase extends Logging {
         env("SPARK_JAVA_OPTS") = value
       }
     }
-    amContainer.setEnvironment(env)
 
-    val amMemory = args.amMemory
+    env
+  }
+
+  /**
+   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
+   * This sets up the launch environment, java options, and the command for 
launching the AM.
+   */
+  protected def createContainerLaunchContext(newAppResponse: 
GetNewApplicationResponse)
+      : ContainerLaunchContext = {
+    logInfo("Setting up container launch context for our AM")
+
+    val appId = newAppResponse.getApplicationId
+    val appStagingDir = getAppStagingDir(appId)
+    val localResources = prepareLocalResources(appStagingDir)
+    val launchEnv = setupLaunchEnv(appStagingDir)
+    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+    amContainer.setLocalResources(localResources)
+    amContainer.setEnvironment(launchEnv)
 
     val javaOpts = ListBuffer[String]()
 
     // Add Xmx for AM memory
-    javaOpts += "-Xmx" + amMemory + "m"
+    javaOpts += "-Xmx" + args.amMemory + "m"
 
     val tmpDir = new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
     javaOpts += "-Djava.io.tmpdir=" + tmpDir
@@ -361,8 +319,7 @@ trait ClientBase extends Logging {
     // Instead of using this, rely on cpusets by YARN to enforce "proper" 
Spark behavior in
     // multi-tenant environments. Not sure how default Java GC behaves if it 
is limited to subset
     // of cores on a node.
-    val useConcurrentAndIncrementalGC = 
env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
-      java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+    val useConcurrentAndIncrementalGC = 
launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
     if (useConcurrentAndIncrementalGC) {
       // In our expts, using (default) throughput collector has severe perf 
ramifications in
       // multi-tenant machines
@@ -380,6 +337,8 @@ trait ClientBase extends Logging {
       javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
     }
 
+    // Include driver-specific java options if we are launching a driver
+    val isLaunchingDriver = args.userClass != null
     if (isLaunchingDriver) {
       sparkConf.getOption("spark.driver.extraJavaOptions")
         .orElse(sys.env.get("SPARK_JAVA_OPTS"))
@@ -397,19 +356,27 @@ trait ClientBase extends Logging {
       } else {
         Nil
       }
+    val userJar =
+      if (args.userJar != null) {
+        Seq("--jar", args.userJar)
+      } else {
+        Nil
+      }
     val amClass =
       if (isLaunchingDriver) {
-        classOf[ApplicationMaster].getName()
+        Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
       } else {
-        classOf[ApplicationMaster].getName().replace("ApplicationMaster", 
"ExecutorLauncher")
+        Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
       }
+    val userArgs = args.userArgs.flatMap { arg =>
+      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
+    }
     val amArgs =
-      Seq(amClass) ++ userClass ++
-      (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++
-      Seq("--executor-memory", args.executorMemory.toString,
+      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
+      Seq(
+        "--executor-memory", args.executorMemory.toString,
         "--executor-cores", args.executorCores.toString,
-        "--num-executors ", args.numExecutors.toString,
-        userArgsToString(args))
+        "--num-executors ", args.numExecutors.toString)
 
     // Command for the ApplicationMaster
     val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
@@ -418,41 +385,153 @@ trait ClientBase extends Logging {
         "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
         "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
 
-    logInfo("Yarn AM launch context:")
-    logInfo(s"  user class: ${args.userClass}")
-    logInfo(s"  env:        $env")
-    logInfo(s"  command:    ${commands.mkString(" ")}")
-
     // TODO: it would be nicer to just make sure there are no null commands 
here
     val printableCommands = commands.map(s => if (s == null) "null" else 
s).toList
     amContainer.setCommands(printableCommands)
 
-    setupSecurityToken(amContainer)
+    
logDebug("===============================================================================")
+    logDebug("Yarn AM launch context:")
+    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
+    logDebug("    env:")
+    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
+    logDebug("    resources:")
+    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
+    logDebug("    command:")
+    logDebug(s"        ${printableCommands.mkString(" ")}")
+    
logDebug("===============================================================================")
 
     // send the acl settings into YARN to control who has access via YARN 
interfaces
     val securityManager = new SecurityManager(sparkConf)
     
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
+    setupSecurityToken(amContainer)
+    UserGroupInformation.getCurrentUser().addCredentials(credentials)
 
     amContainer
   }
+
+  /**
+   * Report the state of an application until it has exited, either 
successfully or
+   * due to some failure, then return the application state.
+   *
+   * @param appId ID of the application to monitor.
+   * @param returnOnRunning Whether to also return the application state when 
it is RUNNING.
+   * @param logApplicationReport Whether to log details of the application 
report every iteration.
+   * @return state of the application, one of FINISHED, FAILED, KILLED, and 
RUNNING.
+   */
+  def monitorApplication(
+      appId: ApplicationId,
+      returnOnRunning: Boolean = false,
+      logApplicationReport: Boolean = true): YarnApplicationState = {
+    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+    var lastState: YarnApplicationState = null
+    while (true) {
+      Thread.sleep(interval)
+      val report = getApplicationReport(appId)
+      val state = report.getYarnApplicationState
+
+      if (logApplicationReport) {
+        logInfo(s"Application report for $appId (state: $state)")
+        val details = Seq[(String, String)](
+          ("client token", getClientToken(report)),
+          ("diagnostics", report.getDiagnostics),
+          ("ApplicationMaster host", report.getHost),
+          ("ApplicationMaster RPC port", report.getRpcPort.toString),
+          ("queue", report.getQueue),
+          ("start time", report.getStartTime.toString),
+          ("final status", report.getFinalApplicationStatus.toString),
+          ("tracking URL", report.getTrackingUrl),
+          ("user", report.getUser)
+        )
+
+        // Use more loggable format if value is null or empty
+        val formattedDetails = details
+          .map { case (k, v) =>
+            val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+            s"\n\t $k: $newValue" }
+          .mkString("")
+
+        // If DEBUG is enabled, log report details every iteration
+        // Otherwise, log them every time the application changes state
+        if (log.isDebugEnabled) {
+          logDebug(formattedDetails)
+        } else if (lastState != state) {
+          logInfo(formattedDetails)
+        }
+      }
+
+      if (state == YarnApplicationState.FINISHED ||
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+        return state
+      }
+
+      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
+        return state
+      }
+
+      lastState = state
+    }
+
+    // Never reached, but keeps compiler happy
+    throw new SparkException("While loop is depleted! This should never 
happen...")
+  }
+
+  /**
+   * Submit an application to the ResourceManager and monitor its state.
+   * This continues until the application has exited for any reason.
+   */
+  def run(): Unit = monitorApplication(submitApplication())
+
+  /* 
---------------------------------------------------------------------------------------
 *
+   |  Methods that cannot be implemented here due to API differences across 
hadoop versions  |
+   * 
---------------------------------------------------------------------------------------
 */
+
+  /** Submit an application running our ApplicationMaster to the 
ResourceManager. */
+  def submitApplication(): ApplicationId
+
+  /** Set up security tokens for launching our ApplicationMaster container. */
+  protected def setupSecurityToken(containerContext: ContainerLaunchContext): 
Unit
+
+  /** Get the application report from the ResourceManager for an application 
we have submitted. */
+  protected def getApplicationReport(appId: ApplicationId): ApplicationReport
+
+  /**
+   * Return the security token used by this client to communicate with the 
ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   */
+  protected def getClientToken(report: ApplicationReport): String
 }
 
-object ClientBase extends Logging {
+private[spark] object ClientBase extends Logging {
+
+  // Alias for the Spark assembly jar and the user jar
   val SPARK_JAR: String = "__spark__.jar"
   val APP_JAR: String = "__app__.jar"
+
+  // URI scheme that identifies local resources
   val LOCAL_SCHEME = "local"
+
+  // Staging directory for any temporary jars or files
+  val SPARK_STAGING: String = ".sparkStaging"
+
+  // Location of any user-defined Spark jars
   val CONF_SPARK_JAR = "spark.yarn.jar"
-  /**
-   * This is an internal config used to propagate the location of the user's 
jar file to the
-   * driver/executors.
-   */
+  val ENV_SPARK_JAR = "SPARK_JAR"
+
+  // Internal config to propagate the location of the user's jar to the 
driver/executors
   val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
-  /**
-   * This is an internal config used to propagate the list of extra jars to 
add to the classpath
-   * of executors.
-   */
+
+  // Internal config to propagate the locations of any extra jars to add to 
the classpath
+  // of the executors
   val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
-  val ENV_SPARK_JAR = "SPARK_JAR"
+
+  // Staging directory is private! -> rwx--------
+  val STAGING_DIR_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+  // App files are world-wide readable and owner writable -> rw-r--r--
+  val APP_FILE_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
 
   /**
    * Find the user-defined Spark jar if configured, or return the jar 
containing this
@@ -461,7 +540,7 @@ object ClientBase extends Logging {
    * This method first looks in the SparkConf object for the CONF_SPARK_JAR 
key, and in the
    * user environment if that is not found (for backwards compatibility).
    */
-  def sparkJar(conf: SparkConf) = {
+  private def sparkJar(conf: SparkConf): String = {
     if (conf.contains(CONF_SPARK_JAR)) {
       conf.get(CONF_SPARK_JAR)
     } else if (System.getenv(ENV_SPARK_JAR) != null) {
@@ -474,16 +553,22 @@ object ClientBase extends Logging {
     }
   }
 
-  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, 
String]) = {
+  /**
+   * Return the path to the given application's staging directory.
+   */
+  private def getAppStagingDir(appId: ApplicationId): String = {
+    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+  }
+
+  /**
+   * Populate the classpath entry in the given environment map with any 
application
+   * classpath specified through the Hadoop and Yarn configurations.
+   */
+  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, 
String]): Unit = {
     val classPathElementsToAdd = getYarnAppClasspath(conf) ++ 
getMRAppClasspath(conf)
     for (c <- classPathElementsToAdd.flatten) {
-      YarnSparkHadoopUtil.addToEnvironment(
-        env,
-        Environment.CLASSPATH.name,
-        c.trim,
-        File.pathSeparator)
+      YarnSparkHadoopUtil.addPathToEnvironment(env, 
Environment.CLASSPATH.name, c.trim)
     }
-    classPathElementsToAdd
   }
 
   private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
@@ -519,7 +604,7 @@ object ClientBase extends Logging {
 
   /**
    * In Hadoop 0.23, the MR application classpath comes with the YARN 
application
-   * classpath.  In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a 
String.
+   * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a 
String.
    * So we need to use reflection to retrieve it.
    */
   def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
@@ -545,8 +630,16 @@ object ClientBase extends Logging {
     triedDefault.toOption
   }
 
-  def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: 
SparkConf,
-      env: HashMap[String, String], extraClassPath: Option[String] = None) {
+  /**
+   * Populate the classpath entry in the given environment map.
+   * This includes the user jar, Spark jar, and any extra application jars.
+   */
+  def populateClasspath(
+      args: ClientArguments,
+      conf: Configuration,
+      sparkConf: SparkConf,
+      env: HashMap[String, String],
+      extraClassPath: Option[String] = None): Unit = {
     extraClassPath.foreach(addClasspathEntry(_, env))
     addClasspathEntry(Environment.PWD.$(), env)
 
@@ -554,36 +647,40 @@ object ClientBase extends Logging {
     if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
       addUserClasspath(args, sparkConf, env)
       addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      ClientBase.populateHadoopClasspath(conf, env)
+      populateHadoopClasspath(conf, env)
     } else {
       addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      ClientBase.populateHadoopClasspath(conf, env)
+      populateHadoopClasspath(conf, env)
       addUserClasspath(args, sparkConf, env)
     }
 
     // Append all jar files under the working directory to the classpath.
-    addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env);
+    addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
   }
 
   /**
    * Adds the user jars which have local: URIs (or alternate names, such as 
APP_JAR) explicitly
    * to the classpath.
    */
-  private def addUserClasspath(args: ClientArguments, conf: SparkConf,
-      env: HashMap[String, String]) = {
-    if (args != null) {
-      addFileToClasspath(args.userJar, APP_JAR, env)
-      if (args.addJars != null) {
-        args.addJars.split(",").foreach { case file: String =>
-          addFileToClasspath(file, null, env)
-        }
+  private def addUserClasspath(
+      args: ClientArguments,
+      conf: SparkConf,
+      env: HashMap[String, String]): Unit = {
+
+    // If `args` is not null, we are launching an AM container.
+    // Otherwise, we are launching executor containers.
+    val (mainJar, secondaryJars) =
+      if (args != null) {
+        (args.userJar, args.addJars)
+      } else {
+        (conf.get(CONF_SPARK_USER_JAR, null), 
conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
       }
-    } else {
-      val userJar = conf.get(CONF_SPARK_USER_JAR, null)
-      addFileToClasspath(userJar, APP_JAR, env)
 
-      val cachedSecondaryJarLinks = conf.get(CONF_SPARK_YARN_SECONDARY_JARS, 
"").split(",")
-      cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, 
env))
+    addFileToClasspath(mainJar, APP_JAR, env)
+    if (secondaryJars != null) {
+      secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
+        addFileToClasspath(jar, null, env)
+      }
     }
   }
 
@@ -599,46 +696,44 @@ object ClientBase extends Logging {
    * @param fileName  Alternate name for the file (optional).
    * @param env       Map holding the environment variables.
    */
-  private def addFileToClasspath(path: String, fileName: String,
-      env: HashMap[String, String]) : Unit = {
+  private def addFileToClasspath(
+      path: String,
+      fileName: String,
+      env: HashMap[String, String]): Unit = {
     if (path != null) {
       scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
-        val localPath = getLocalPath(path)
-        if (localPath != null) {
-          addClasspathEntry(localPath, env)
+        val uri = new URI(path)
+        if (uri.getScheme == LOCAL_SCHEME) {
+          addClasspathEntry(uri.getPath, env)
           return
         }
       }
     }
     if (fileName != null) {
-      addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env);
+      addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env)
     }
   }
 
   /**
-   * Returns the local path if the URI is a "local:" URI, or null otherwise.
+   * Add the given path to the classpath entry of the given environment map.
+   * If the classpath is already set, this appends the new path to the 
existing classpath.
    */
-  private def getLocalPath(resource: String): String = {
-    val uri = new URI(resource)
-    if (LOCAL_SCHEME.equals(uri.getScheme())) {
-      return uri.getPath()
-    }
-    null
-  }
-
-  private def addClasspathEntry(path: String, env: HashMap[String, String]) =
-    YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
-            File.pathSeparator)
+  private def addClasspathEntry(path: String, env: HashMap[String, String]): 
Unit =
+    YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, 
path)
 
   /**
    * Get the list of namenodes the user may access.
    */
-  private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
-    sparkConf.get("spark.yarn.access.namenodes", 
"").split(",").map(_.trim()).filter(!_.isEmpty)
-      .map(new Path(_)).toSet
+  def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+    sparkConf.get("spark.yarn.access.namenodes", "")
+      .split(",")
+      .map(_.trim())
+      .filter(!_.isEmpty)
+      .map(new Path(_))
+      .toSet
   }
 
-  private[yarn] def getTokenRenewer(conf: Configuration): String = {
+  def getTokenRenewer(conf: Configuration): String = {
     val delegTokenRenewer = Master.getMasterPrincipal(conf)
     logDebug("delegation token renewer is: " + delegTokenRenewer)
     if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
@@ -652,17 +747,54 @@ object ClientBase extends Logging {
   /**
    * Obtains tokens for the namenodes passed in and adds them to the 
credentials.
    */
-  private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: 
Configuration,
-    creds: Credentials) {
+  def obtainTokensForNamenodes(
+      paths: Set[Path],
+      conf: Configuration,
+      creds: Credentials): Unit = {
     if (UserGroupInformation.isSecurityEnabled()) {
       val delegTokenRenewer = getTokenRenewer(conf)
+      paths.foreach { dst =>
+        val dstFs = dst.getFileSystem(conf)
+        logDebug("getting token for namenode: " + dst)
+        dstFs.addDelegationTokens(delegTokenRenewer, creds)
+      }
+    }
+  }
 
-      paths.foreach {
-        dst =>
-          val dstFs = dst.getFileSystem(conf)
-          logDebug("getting token for namenode: " + dst)
-          dstFs.addDelegationTokens(delegTokenRenewer, creds)
+  /**
+   * Return whether the two file systems are the same.
+   */
+  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+    if (srcUri.getScheme() == null) {
+      return false
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false
+    }
+    var srcHost = srcUri.getHost()
+    var dstHost = dstUri.getHost()
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+      } catch {
+        case e: UnknownHostException =>
+          return false
       }
+      if (!srcHost.equals(dstHost)) {
+        return false
+      }
+    } else if (srcHost == null && dstHost != null) {
+      return false
+    } else if (srcHost != null && dstHost == null) {
+      return false
+    }
+    if (srcUri.getPort() != dstUri.getPort()) {
+      false
+    } else {
+      true
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 9b7f1fc..c592ecf 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -19,29 +19,24 @@ package org.apache.spark.deploy.yarn
 
 import java.net.URI
 
+import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
+
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
 
-import org.apache.spark.Logging 
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.LinkedHashMap
-import scala.collection.mutable.Map
-
+import org.apache.spark.Logging
 
 /** Client side methods to setup the Hadoop distributed cache */
-class ClientDistributedCacheManager() extends Logging {
-  private val distCacheFiles: Map[String, Tuple3[String, String, String]] = 
-    LinkedHashMap[String, Tuple3[String, String, String]]()
-  private val distCacheArchives: Map[String, Tuple3[String, String, String]] = 
-    LinkedHashMap[String, Tuple3[String, String, String]]()
+private[spark] class ClientDistributedCacheManager() extends Logging {
+
+  // Mappings from remote URI to (file status, modification time, visibility)
+  private val distCacheFiles: Map[String, (String, String, String)] =
+    LinkedHashMap[String, (String, String, String)]()
+  private val distCacheArchives: Map[String, (String, String, String)] =
+    LinkedHashMap[String, (String, String, String)]()
 
 
   /**
@@ -68,9 +63,9 @@ class ClientDistributedCacheManager() extends Logging {
       resourceType: LocalResourceType,
       link: String,
       statCache: Map[URI, FileStatus],
-      appMasterOnly: Boolean = false) = {
+      appMasterOnly: Boolean = false): Unit = {
     val destStatus = fs.getFileStatus(destPath)
-    val amJarRsrc = 
Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    val amJarRsrc = Records.newRecord(classOf[LocalResource])
     amJarRsrc.setType(resourceType)
     val visibility = getVisibility(conf, destPath.toUri(), statCache)
     amJarRsrc.setVisibility(visibility)
@@ -80,7 +75,7 @@ class ClientDistributedCacheManager() extends Logging {
     if (link == null || link.isEmpty()) throw new Exception("You must specify 
a valid link name")
     localResources(link) = amJarRsrc
     
-    if (appMasterOnly == false) {
+    if (!appMasterOnly) {
       val uri = destPath.toUri()
       val pathURI = new URI(uri.getScheme(), uri.getAuthority(), 
uri.getPath(), null, link)
       if (resourceType == LocalResourceType.FILE) {
@@ -95,12 +90,10 @@ class ClientDistributedCacheManager() extends Logging {
 
   /**
    * Adds the necessary cache file env variables to the env passed in
-   * @param env
    */
-  def setDistFilesEnv(env: Map[String, String]) = {
+  def setDistFilesEnv(env: Map[String, String]): Unit = {
     val (keys, tupleValues) = distCacheFiles.unzip
     val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
     if (keys.size > 0) {
       env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc 
+ "," + n }
       env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
@@ -114,12 +107,10 @@ class ClientDistributedCacheManager() extends Logging {
 
   /**
    * Adds the necessary cache archive env variables to the env passed in
-   * @param env
    */
-  def setDistArchivesEnv(env: Map[String, String]) = {
+  def setDistArchivesEnv(env: Map[String, String]): Unit = {
     val (keys, tupleValues) = distCacheArchives.unzip
     val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
     if (keys.size > 0) {
       env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => 
acc + "," + n }
       env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
@@ -133,25 +124,21 @@ class ClientDistributedCacheManager() extends Logging {
 
   /**
    * Returns the local resource visibility depending on the cache file 
permissions
-   * @param conf
-   * @param uri
-   * @param statCache
    * @return LocalResourceVisibility
    */
-  def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, 
FileStatus]):
-      LocalResourceVisibility = {
+  def getVisibility(
+      conf: Configuration,
+      uri: URI,
+      statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
     if (isPublic(conf, uri, statCache)) {
-      return LocalResourceVisibility.PUBLIC 
-    } 
-    LocalResourceVisibility.PRIVATE
+      LocalResourceVisibility.PUBLIC
+    } else {
+      LocalResourceVisibility.PRIVATE
+    }
   }
 
   /**
-   * Returns a boolean to denote whether a cache file is visible to all(public)
-   * or not
-   * @param conf
-   * @param uri
-   * @param statCache
+   * Returns a boolean to denote whether a cache file is visible to all 
(public)
    * @return true if the path in the uri is visible to all, false otherwise
    */
   def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, 
FileStatus]): Boolean = {
@@ -167,13 +154,12 @@ class ClientDistributedCacheManager() extends Logging {
   /**
    * Returns true if all ancestors of the specified path have the 'execute'
    * permission set for all users (i.e. that other users can traverse
-   * the directory heirarchy to the given path)
-   * @param fs
-   * @param path
-   * @param statCache
+   * the directory hierarchy to the given path)
    * @return true if all ancestors have the 'execute' permission set for all 
users
    */
-  def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, 
+  def ancestorsHaveExecutePermissions(
+      fs: FileSystem,
+      path: Path,
       statCache: Map[URI, FileStatus]): Boolean =  {
     var current = path
     while (current != null) {
@@ -187,32 +173,25 @@ class ClientDistributedCacheManager() extends Logging {
   }
 
   /**
-   * Checks for a given path whether the Other permissions on it 
+   * Checks for a given path whether the Other permissions on it
    * imply the permission in the passed FsAction
-   * @param fs
-   * @param path
-   * @param action
-   * @param statCache
    * @return true if the path in the uri is visible to all, false otherwise
    */
-  def checkPermissionOfOther(fs: FileSystem, path: Path,
-      action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
+  def checkPermissionOfOther(
+      fs: FileSystem,
+      path: Path,
+      action: FsAction,
+      statCache: Map[URI, FileStatus]): Boolean = {
     val status = getFileStatus(fs, path.toUri(), statCache)
     val perms = status.getPermission()
     val otherAction = perms.getOtherAction()
-    if (otherAction.implies(action)) {
-      return true
-    }
-    false
+    otherAction.implies(action)
   }
 
   /**
-   * Checks to see if the given uri exists in the cache, if it does it 
+   * Checks to see if the given uri exists in the cache, if it does it
    * returns the existing FileStatus, otherwise it stats the uri, stores
    * it in the cache, and returns the FileStatus.
-   * @param fs
-   * @param uri
-   * @param statCache
    * @return FileStatus
    */
   def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, 
FileStatus]): FileStatus = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index f56f72c..bbbf615 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
 import java.net.URI
 
 import scala.collection.JavaConversions._
@@ -128,9 +127,9 @@ trait ExecutorRunnableUtil extends Logging {
       localResources: HashMap[String, LocalResource],
       timestamp: String,
       size: String,
-      vis: String) = {
+      vis: String): Unit = {
     val uri = new URI(file)
-    val amJarRsrc = 
Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    val amJarRsrc = Records.newRecord(classOf[LocalResource])
     amJarRsrc.setType(rtype)
     amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
@@ -175,14 +174,17 @@ trait ExecutorRunnableUtil extends Logging {
     ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
-      YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
+      // This assumes each executor environment variable set here is a path
+      // This is kept for backward compatibility and consistency with hadoop
+      YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
     }
 
     // Keep this for backwards compatibility but users should move to the 
config
-    YarnSparkHadoopUtil.setEnvFromInputString(env, 
System.getenv("SPARK_YARN_USER_ENV"),
-      File.pathSeparator)
+    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+    }
 
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => 
env(k) = v }
+    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => 
env(k) = v }
     env
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4a33e34..0b712c2 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.lang.{Boolean => JBoolean}
+import java.io.File
 import java.util.{Collections, Set => JSet}
 import java.util.regex.Matcher
 import java.util.regex.Pattern
@@ -29,14 +30,12 @@ import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.StringInterner
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -100,30 +99,26 @@ object YarnSparkHadoopUtil {
   private val hostToRack = new ConcurrentHashMap[String, String]()
   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
 
-  def addToEnvironment(
-      env: HashMap[String, String],
-      variable: String,
-      value: String,
-      classPathSeparator: String) = {
-    var envVariable = ""
-    if (env.get(variable) == None) {
-      envVariable = value
-    } else {
-      envVariable = env.get(variable).get + classPathSeparator + value
-    }
-    env put (StringInterner.weakIntern(variable), 
StringInterner.weakIntern(envVariable))
+  /**
+   * Add a path variable to the given environment map.
+   * If the map already contains this key, append the value to the existing 
value instead.
+   */
+  def addPathToEnvironment(env: HashMap[String, String], key: String, value: 
String): Unit = {
+    val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + 
value } else value
+    env.put(key, newValue)
   }
 
-  def setEnvFromInputString(
-      env: HashMap[String, String],
-      envString: String,
-      classPathSeparator: String) = {
-    if (envString != null && envString.length() > 0) {
-      var childEnvs = envString.split(",")
-      var p = Pattern.compile(getEnvironmentVariableRegex())
+  /**
+   * Set zero or more environment variables specified by the given input 
string.
+   * The input string is expected to take the form 
"KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
+   */
+  def setEnvFromInputString(env: HashMap[String, String], inputString: 
String): Unit = {
+    if (inputString != null && inputString.length() > 0) {
+      val childEnvs = inputString.split(",")
+      val p = Pattern.compile(environmentVariableRegex)
       for (cEnv <- childEnvs) {
-        var parts = cEnv.split("=") // split on '='
-        var m = p.matcher(parts(1))
+        val parts = cEnv.split("=") // split on '='
+        val m = p.matcher(parts(1))
         val sb = new StringBuffer
         while (m.find()) {
           val variable = m.group(1)
@@ -131,8 +126,7 @@ object YarnSparkHadoopUtil {
           if (env.get(variable) != None) {
             replace = env.get(variable).get
           } else {
-            // if this key is not configured for the child .. get it
-            // from the env
+            // if this key is not configured for the child .. get it from the 
env
             replace = System.getenv(variable)
             if (replace == null) {
             // the env key is note present anywhere .. simply set it
@@ -142,14 +136,15 @@ object YarnSparkHadoopUtil {
           m.appendReplacement(sb, Matcher.quoteReplacement(replace))
         }
         m.appendTail(sb)
-        addToEnvironment(env, parts(0), sb.toString(), classPathSeparator)
+        // This treats the environment variable as path variable delimited by 
`File.pathSeparator`
+        // This is kept for backward compatibility and consistency with 
Hadoop's behavior
+        addPathToEnvironment(env, parts(0), sb.toString)
       }
     }
   }
 
-  private def getEnvironmentVariableRegex() : String = {
-    val osName = System.getProperty("os.name")
-    if (osName startsWith "Windows") {
+  private val environmentVariableRegex: String = {
+    if (Utils.isWindows) {
       "%([A-Za-z_][A-Za-z0-9_]*?)%"
     } else {
       "\\$([A-Za-z_][A-Za-z0-9_]*)"
@@ -181,14 +176,14 @@ object YarnSparkHadoopUtil {
     }
   }
 
-  private[spark] def lookupRack(conf: Configuration, host: String): String = {
+  def lookupRack(conf: Configuration, host: String): String = {
     if (!hostToRack.contains(host)) {
       populateRackInfo(conf, host)
     }
     hostToRack.get(host)
   }
 
-  private[spark] def populateRackInfo(conf: Configuration, hostname: String) {
+  def populateRackInfo(conf: Configuration, hostname: String) {
     Utils.checkHost(hostname)
 
     if (!hostToRack.containsKey(hostname)) {
@@ -212,8 +207,8 @@ object YarnSparkHadoopUtil {
     }
   }
 
-  private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
-      Map[ApplicationAccessType, String] = {
+  def getApplicationAclsForYarn(securityMgr: SecurityManager)
+      : Map[ApplicationAccessType, String] = {
     Map[ApplicationAccessType, String] (
       ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
       ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6aa6475..200a308 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
 import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, 
YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 import scala.collection.mutable.ArrayBuffer
@@ -34,115 +34,120 @@ private[spark] class YarnClientSchedulerBackend(
     minRegisteredRatio = 0.8
   }
 
-  var client: Client = null
-  var appId: ApplicationId = null
-  var checkerThread: Thread = null
-  var stopping: Boolean = false
-  var totalExpectedExecutors = 0
-
-  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
-      arrayBuf: ArrayBuffer[String]) {
-    if (System.getenv(envVar) != null) {
-      arrayBuf += (optionName, System.getenv(envVar))
-    } else if (sc.getConf.contains(sysProp)) {
-      arrayBuf += (optionName, sc.getConf.get(sysProp))
-    }
-  }
+  private var client: Client = null
+  private var appId: ApplicationId = null
+  private var stopping: Boolean = false
+  private var totalExpectedExecutors = 0
 
+  /**
+   * Create a Yarn client to submit an application to the ResourceManager.
+   * This waits until the application is running.
+   */
   override def start() {
     super.start()
-
     val driverHost = conf.get("spark.driver.host")
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
     sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", 
ui.appUIHostPort) }
 
     val argsArrayBuf = new ArrayBuffer[String]()
-    argsArrayBuf += (
-      "--args", hostport
-    )
-
-    // process any optional arguments, given either as environment variables
-    // or system properties. use the defaults already defined in 
ClientArguments
-    // if things aren't specified. system properties override environment
-    // variables.
-    List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
-      ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
-      ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.executor.instances"),
-      ("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
-      ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
-      ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
-      ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
-      ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
-      ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
-      ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
-    .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, 
sysProp, argsArrayBuf) }
-
-    logDebug("ClientArguments called with: " + argsArrayBuf)
+    argsArrayBuf += ("--arg", hostport)
+    argsArrayBuf ++= getExtraClientArguments
+
+    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
     totalExpectedExecutors = args.numExecutors
     client = new Client(args, conf)
-    appId = client.runApp()
-    waitForApp()
-    checkerThread = yarnApplicationStateCheckerThread()
+    appId = client.submitApplication()
+    waitForApplication()
+    asyncMonitorApplication()
   }
 
-  def waitForApp() {
-
-    // TODO : need a better way to find out whether the executors are ready or 
not
-    // maybe by resource usage report?
-    while(true) {
-      val report = client.getApplicationReport(appId)
-
-      logInfo("Application report from ASM: \n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+  /**
+   * Return any extra command line arguments to be passed to Client provided 
in the form of
+   * environment variables or Spark properties.
+   */
+  private def getExtraClientArguments: Seq[String] = {
+    val extraArgs = new ArrayBuffer[String]
+    val optionTuples = // List of (target Client argument, environment 
variable, Spark property)
+      List(
+        ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+        ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+        ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.executor.instances"),
+        ("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
+        ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+        ("--executor-memory", "SPARK_EXECUTOR_MEMORY", 
"spark.executor.memory"),
+        ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+        ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+        ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
       )
-
-      // Ready to go, or already gone.
-      val state = report.getYarnApplicationState()
-      if (state == YarnApplicationState.RUNNING) {
-        return
-      } else if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        throw new SparkException("Yarn application already ended," +
-          "might be killed or not able to launch application master.")
+    optionTuples.foreach { case (optionName, envVar, sparkProp) =>
+      if (System.getenv(envVar) != null) {
+        extraArgs += (optionName, System.getenv(envVar))
+      } else if (sc.getConf.contains(sparkProp)) {
+        extraArgs += (optionName, sc.getConf.get(sparkProp))
       }
+    }
+    extraArgs
+  }
 
-      Thread.sleep(1000)
+  /**
+   * Report the state of the application until it is running.
+   * If the application has finished, failed or been killed in the process, 
throw an exception.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def waitForApplication(): Unit = {
+    assert(client != null && appId != null, "Application has not been 
submitted yet!")
+    val state = client.monitorApplication(appId, returnOnRunning = true) // 
blocking
+    if (state == YarnApplicationState.FINISHED ||
+      state == YarnApplicationState.FAILED ||
+      state == YarnApplicationState.KILLED) {
+      throw new SparkException("Yarn application has already ended! " +
+        "It might have been killed or unable to launch application master.")
+    }
+    if (state == YarnApplicationState.RUNNING) {
+      logInfo(s"Application $appId has started running.")
     }
   }
 
-  private def yarnApplicationStateCheckerThread(): Thread = {
+  /**
+   * Monitor the application state in a separate thread.
+   * If the application has exited for any reason, stop the SparkContext.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def asyncMonitorApplication(): Unit = {
+    assert(client != null && appId != null, "Application has not been 
submitted yet!")
     val t = new Thread {
       override def run() {
         while (!stopping) {
           val report = client.getApplicationReport(appId)
           val state = report.getYarnApplicationState()
-          if (state == YarnApplicationState.FINISHED || state == 
YarnApplicationState.KILLED
-            || state == YarnApplicationState.FAILED) {
-            logError(s"Yarn application already ended: $state")
+          if (state == YarnApplicationState.FINISHED ||
+            state == YarnApplicationState.KILLED ||
+            state == YarnApplicationState.FAILED) {
+            logError(s"Yarn application has already exited with state $state!")
             sc.stop()
             stopping = true
           }
           Thread.sleep(1000L)
         }
-        checkerThread = null
         Thread.currentThread().interrupt()
       }
     }
-    t.setName("Yarn Application State Checker")
+    t.setName("Yarn application state monitor")
     t.setDaemon(true)
     t.start()
-    t
   }
 
+  /**
+   * Stop the scheduler. This assumes `start()` has already been called.
+   */
   override def stop() {
+    assert(client != null, "Attempted to stop this scheduler before starting 
it!")
     stopping = true
     super.stop()
-    client.stop
+    client.stop()
     logInfo("Stopped")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index c3b7a2c..9bd9161 100644
--- 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -90,7 +90,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
     val env = new MutableHashMap[String, String]()
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
 
-    ClientBase.populateClasspath(args, conf, sparkConf, env, None)
+    ClientBase.populateClasspath(args, conf, sparkConf, env)
 
     val cp = env("CLASSPATH").split(File.pathSeparator)
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -114,10 +114,10 @@ class ClientBaseSuite extends FunSuite with Matchers {
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
 
     val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
-    doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]),
+    doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
       any(classOf[Path]), anyShort(), anyBoolean())
 
-    var tempDir = Files.createTempDir();
+    val tempDir = Files.createTempDir()
     try {
       client.prepareLocalResources(tempDir.getAbsolutePath())
       sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be 
(Some(USER))
@@ -247,13 +247,13 @@ class ClientBaseSuite extends FunSuite with Matchers {
 
   private class DummyClient(
       val args: ClientArguments,
-      val conf: Configuration,
+      val hadoopConf: Configuration,
       val sparkConf: SparkConf,
       val yarnConf: YarnConfiguration) extends ClientBase {
-
-    override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit 
=
-      throw new UnsupportedOperationException()
-
+    override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit 
= ???
+    override def submitApplication(): ApplicationId = ???
+    override def getApplicationReport(appId: ApplicationId): ApplicationReport 
= ???
+    override def getClientToken(report: ApplicationReport): String = ???
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to