Repository: spark
Updated Branches:
  refs/heads/master 6e1193031 -> 2a4225dd9


SPARK-1639. Tidy up some Spark on YARN code

This contains a bunch of small tidyings of the Spark on YARN code.

I focused on the yarn stable code.  @tgravescs, let me know if you'd like me to 
make these for the alpha code as well.

Author: Sandy Ryza <sa...@cloudera.com>

Closes #561 from sryza/sandy-spark-1639 and squashes the following commits:

72b6a02 [Sandy Ryza] Fix comment and set name on driver thread
c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a4225dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a4225dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a4225dd

Branch: refs/heads/master
Commit: 2a4225dd944441d3f735625bb6bae6fca8fd06ca
Parents: 6e11930
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Wed Jun 11 07:57:28 2014 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Wed Jun 11 07:57:28 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   |  16 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   |  38 ++--
 .../deploy/yarn/ExecutorRunnableUtil.scala      |  28 +--
 .../cluster/YarnClusterScheduler.scala          |  10 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 197 +++++++++----------
 .../org/apache/spark/deploy/yarn/Client.scala   |  10 +-
 .../spark/deploy/yarn/ExecutorLauncher.scala    |  40 ++--
 7 files changed, 161 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2a4225dd/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f0ecb8..1cc9c33 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
         yarnAllocator.allocateContainers(
           math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 
0))
         ApplicationMaster.incrementAllocatorLoop(1)
-        Thread.sleep(100)
+        Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
       }
     } finally {
       // In case of exceptions, etc - ensure that count is at least 
ALLOCATOR_LOOP_WAIT_COUNT,
@@ -416,6 +416,7 @@ object ApplicationMaster {
   // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
   // optimal as more containers are available. Might need to handle this 
better.
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
   def incrementAllocatorLoop(by: Int) {
     val count = yarnAllocatorLoop.getAndAdd(by)
@@ -467,13 +468,22 @@ object ApplicationMaster {
       })
     }
 
-    // Wait for initialization to complete and atleast 'some' nodes can get 
allocated.
+    modified
+  }
+
+
+  /**
+   * Returns when we've either
+   *  1) received all the requested executors,
+   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
+   *  3) hit an error that causes us to terminate trying to get containers.
+   */
+  def waitForInitialAllocations() {
     yarnAllocatorLoop.synchronized {
       while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
         yarnAllocatorLoop.wait(1000L)
       }
     }
-    modified
   }
 
   def main(argStrings: Array[String]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4225dd/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 801e8b3..29a3568 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
 
 import java.io.File
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
-import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -37,7 +36,7 @@ import 
org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
 import org.apache.spark.{Logging, SparkConf, SparkContext}
 
 /**
@@ -169,14 +168,13 @@ trait ClientBase extends Logging {
     destPath
   }
 
-  def qualifyForLocal(localURI: URI): Path = {
+  private def qualifyForLocal(localURI: URI): Path = {
     var qualifiedURI = localURI
-    // If not specified assume these are in the local filesystem to keep 
behavior like Hadoop
+    // If not specified, assume these are in the local filesystem to keep 
behavior like Hadoop
     if (qualifiedURI.getScheme() == null) {
       qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new 
Path(qualifiedURI)).toString)
     }
-    val qualPath = new Path(qualifiedURI)
-    qualPath
+    new Path(qualifiedURI)
   }
 
   def prepareLocalResources(appStagingDir: String): HashMap[String, 
LocalResource] = {
@@ -305,13 +303,13 @@ trait ClientBase extends Logging {
 
     val amMemory = calculateAMMemory(newApp)
 
-    val JAVA_OPTS = ListBuffer[String]()
+    val javaOpts = ListBuffer[String]()
 
     // Add Xmx for AM memory
-    JAVA_OPTS += "-Xmx" + amMemory + "m"
+    javaOpts += "-Xmx" + amMemory + "m"
 
     val tmpDir = new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-    JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
+    javaOpts += "-Djava.io.tmpdir=" + tmpDir
 
     // TODO: Remove once cpuset version is pushed out.
     // The context is, default gc for server class machines ends up using all 
cores to do gc -
@@ -325,11 +323,11 @@ trait ClientBase extends Logging {
     if (useConcurrentAndIncrementalGC) {
       // In our expts, using (default) throughput collector has severe perf 
ramifications in
       // multi-tenant machines
-      JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
-      JAVA_OPTS += "-XX:+CMSIncrementalMode"
-      JAVA_OPTS += "-XX:+CMSIncrementalPacing"
-      JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
-      JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
+      javaOpts += "-XX:+UseConcMarkSweepGC"
+      javaOpts += "-XX:+CMSIncrementalMode"
+      javaOpts += "-XX:+CMSIncrementalPacing"
+      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
+      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
     }
 
     // SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
@@ -344,22 +342,22 @@ trait ClientBase extends Logging {
       // If we are being launched in client mode, forward the spark-conf 
options
       // onto the executor launcher
       for ((k, v) <- sparkConf.getAll) {
-        JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+        javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
       }
     } else {
       // If we are being launched in standalone mode, capture and forward any 
spark
       // system properties (e.g. set by spark-class).
       for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
-        JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+        javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
       }
-      sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS 
+= opts)
-      sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += 
s"-Djava.library.path=$p")
+      sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts 
+= opts)
+      sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += 
s"-Djava.library.path=$p")
     }
-    JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+    javaOpts += ClientBase.getLog4jConfiguration(localResources)
 
     // Command for the ApplicationMaster
     val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
-      JAVA_OPTS ++
+      javaOpts ++
       Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
         userArgsToString(args),
         "--executor-memory", args.executorMemory.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4225dd/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 32f8861..43dbb24 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark.{Logging, SparkConf}
 
@@ -46,19 +46,19 @@ trait ExecutorRunnableUtil extends Logging {
       executorCores: Int,
       localResources: HashMap[String, LocalResource]): List[String] = {
     // Extra options for the JVM
-    val JAVA_OPTS = ListBuffer[String]()
+    val javaOpts = ListBuffer[String]()
     // Set the JVM memory
     val executorMemoryString = executorMemory + "m"
-    JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + 
executorMemoryString + " "
+    javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString 
+ " "
 
     // Set extra Java options for the executor, if defined
     sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
-      JAVA_OPTS += opts
+      javaOpts += opts
     }
 
-    JAVA_OPTS += "-Djava.io.tmpdir=" +
+    javaOpts += "-Djava.io.tmpdir=" +
       new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-    JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+    javaOpts += ClientBase.getLog4jConfiguration(localResources)
 
     // Certain configs need to be passed here because they are needed before 
the Executor
     // registers with the Scheduler and transfers the spark configs. Since the 
Executor backend
@@ -66,10 +66,10 @@ trait ExecutorRunnableUtil extends Logging {
     // authentication settings.
     sparkConf.getAll.
       filter { case (k, v) => k.startsWith("spark.auth") || 
k.startsWith("spark.akka") }.
-      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
+      foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
 
     sparkConf.getAkkaConf.
-      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
+      foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + 
"\\\"" }
 
     // Commenting it out for now - so that people can refer to the properties 
if required. Remove
     // it once cpuset version is pushed out.
@@ -88,11 +88,11 @@ trait ExecutorRunnableUtil extends Logging {
           // multi-tennent machines
           // The options are based on
           // 
http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
-          JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-          JAVA_OPTS += " -XX:+CMSIncrementalMode "
-          JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+          javaOpts += " -XX:+UseConcMarkSweepGC "
+          javaOpts += " -XX:+CMSIncrementalMode "
+          javaOpts += " -XX:+CMSIncrementalPacing "
+          javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
+          javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
         }
     */
 
@@ -104,7 +104,7 @@ trait ExecutorRunnableUtil extends Logging {
       // TODO: If the OOM is not recoverable by rescheduling it on different 
node, then do
       // 'something' to fail job ... akin to blacklisting trackers in mapred ?
       "-XX:OnOutOfMemoryError='kill %p'") ++
-      JAVA_OPTS ++
+      javaOpts ++
       Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       masterAddress.toString,
       slaveId.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4225dd/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index a4638cc..39cdd2e 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -33,10 +33,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, 
conf: Configuration)
 
   def this(sc: SparkContext) = this(sc, new Configuration())
 
-  // Nothing else for now ... initialize application master : which needs 
sparkContext to determine how to allocate
-  // Note that only the first creation of SparkContext influences (and 
ideally, there must be only one SparkContext, right ?)
-  // Subsequent creations are ignored - since nodes are already allocated by 
then.
-
+  // Nothing else for now ... initialize application master : which needs a 
SparkContext to
+  // determine how to allocate.
+  // Note that only the first creation of a SparkContext influences (and 
ideally, there must be
+  // only one SparkContext, right ?). Subsequent creations are ignored since 
executors are already
+  // allocated by then.
 
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
@@ -48,6 +49,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, 
conf: Configuration)
   override def postStartHook() {
     val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
     if (sparkContextInitialized){
+      ApplicationMaster.waitForInitialAllocations()
       // Wait for a few seconds for the slaves to bootstrap and register with 
master - best case attempt
       Thread.sleep(3000L)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4225dd/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 33a60d9..6244332 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,13 +19,12 @@ package org.apache.spark.deploy.yarn
 
 import java.io.IOException
 import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
 
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.protocolrecords._
@@ -33,8 +32,7 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
@@ -77,17 +75,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     // than user specified and /tmp.
     System.setProperty("spark.local.dir", getLocalDirs())
 
-    // set the web ui port to be ephemeral for yarn so we don't conflict with
+    // Set the web ui port to be ephemeral for yarn so we don't conflict with
     // other spark processes running on the same box
     System.setProperty("spark.ui.port", "0")
 
-    // when running the AM, the Spark master is always "yarn-cluster"
+    // When running the AM, the Spark master is always "yarn-cluster"
     System.setProperty("spark.master", "yarn-cluster")
 
-    // Use priority 30 as it's higher then HDFS. It's same priority as 
MapReduce is using.
+    // Use priority 30 as it's higher than HDFS. It's the same priority 
MapReduce is using.
     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 
30)
 
-    appAttemptId = getApplicationAttemptId()
+    appAttemptId = ApplicationMaster.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
     isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
     amClient = AMRMClient.createAMRMClient()
     amClient.init(yarnConf)
@@ -99,7 +98,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     ApplicationMaster.register(this)
 
     // Call this to force generation of secret so it gets populated into the
-    // hadoop UGI. This has to happen before the startUserClass which does a
+    // Hadoop UGI. This has to happen before the startUserClass which does a
     // doAs in order for the credentials to be passed on to the executor 
containers.
     val securityMgr = new SecurityManager(sparkConf)
 
@@ -121,7 +120,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     // Allocate all containers
     allocateExecutors()
 
-    // Wait for the user class to Finish
+    // Launch thread that will heartbeat to the RM so it won't think the app 
has died.
+    launchReporterThread()
+
+    // Wait for the user class to finish
     userThread.join()
 
     System.exit(0)
@@ -141,7 +143,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", 
params)
   }
 
-  /** Get the Yarn approved local directories. */
+  // Get the Yarn approved local directories.
   private def getLocalDirs(): String = {
     // Hadoop 0.23 and 2.x have different Environment variable names for the
     // local dirs, so lets check both. We assume one of the 2 is set.
@@ -150,18 +152,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       .orElse(Option(System.getenv("LOCAL_DIRS")))
  
     localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case None => throw new Exception("Yarn local dirs can't be empty")
       case Some(l) => l
     }
-  } 
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = 
envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
   }
 
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = 
{
@@ -173,25 +166,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
       args.userClass,
-      false /* initialize */ ,
+      false,
       Thread.currentThread.getContextClassLoader).getMethod("main", 
classOf[Array[String]])
     val t = new Thread {
       override def run() {
-
-      var successed = false
+        var succeeded = false
         try {
           // Copy
-          var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+          val mainArgs = new Array[String](args.userArgs.size)
           args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
           mainMethod.invoke(null, mainArgs)
-          // some job script has "System.exit(0)" at the end, for example 
SparkPi, SparkLR
-          // userThread will stop here unless it has uncaught exception thrown 
out
-          // It need shutdown hook to set SUCCEEDED
-          successed = true
+          // Some apps have "System.exit(0)" at the end.  The user thread will 
stop here unless
+          // it has an uncaught exception thrown out.  It needs a shutdown 
hook to set SUCCEEDED.
+          succeeded = true
         } finally {
-          logDebug("finishing main")
+          logDebug("Finishing main")
           isLastAMRetry = true
-          if (successed) {
+          if (succeeded) {
             
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           } else {
             
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
@@ -199,11 +190,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
         }
       }
     }
+    t.setName("Driver")
     t.start()
     t
   }
 
-  // This need to happen before allocateExecutors()
+  // This needs to happen before allocateExecutors()
   private def waitForSparkContextInitialized() {
     logInfo("Waiting for Spark context initialization")
     try {
@@ -231,7 +223,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
             sparkContext.preferredNodeLocationData,
             sparkContext.getConf)
         } else {
-          logWarning("Unable to retrieve SparkContext inspite of waiting for 
%d, maxNumTries = %d".
+          logWarning("Unable to retrieve SparkContext in spite of waiting for 
%d, maxNumTries = %d".
             format(numTries * waitTime, maxNumTries))
           this.yarnAllocator = YarnAllocationHandler.newAllocator(
             yarnConf,
@@ -242,48 +234,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
         }
       }
     } finally {
-      // In case of exceptions, etc - ensure that count is at least 
ALLOCATOR_LOOP_WAIT_COUNT :
-      // so that the loop (in ApplicationMaster.sparkContextInitialized) 
breaks.
-      
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+      // In case of exceptions, etc - ensure that the loop in
+      // ApplicationMaster#sparkContextInitialized() breaks.
+      ApplicationMaster.doneWithInitialAllocations()
     }
   }
 
   private def allocateExecutors() {
     try {
-      logInfo("Allocating " + args.numExecutors + " executors.")
-      // Wait until all containers have finished
+      logInfo("Requesting" + args.numExecutors + " executors.")
+      // Wait until all containers have launched
       yarnAllocator.addResourceRequests(args.numExecutors)
       yarnAllocator.allocateResources()
       // Exits the loop if the user thread exits.
+
+      var iters = 0
       while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && 
userThread.isAlive) {
         checkNumExecutorsFailed()
         allocateMissingExecutor()
         yarnAllocator.allocateResources()
-        ApplicationMaster.incrementAllocatorLoop(1)
-        Thread.sleep(100)
+        if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
+          ApplicationMaster.doneWithInitialAllocations()
+        }
+        Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
+        iters += 1
       }
     } finally {
-      // In case of exceptions, etc - ensure that count is at least 
ALLOCATOR_LOOP_WAIT_COUNT,
-      // so that the loop in ApplicationMaster#sparkContextInitialized() 
breaks.
-      
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+      // In case of exceptions, etc - ensure that the loop in
+      // ApplicationMaster#sparkContextInitialized() breaks.
+      ApplicationMaster.doneWithInitialAllocations()
     }
     logInfo("All executors have launched.")
-
-    // Launch a progress reporter thread, else the app will get killed after 
expiration
-    // (def: 10mins) timeout.
-    if (userThread.isAlive) {
-      // Ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-      val timeoutInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-      // we want to be reasonably responsive without causing too many requests 
to RM.
-      val schedulerInterval =
-        sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
-      // must be <= timeoutInterval / 2.
-      val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-      launchReporterThread(interval)
-    }
   }
 
   private def allocateMissingExecutor() {
@@ -303,47 +284,35 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     }
   }
 
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+  private def launchReporterThread(): Thread = {
+    // Ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+    val expiryInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+    // we want to be reasonably responsive without causing too many requests 
to RM.
+    val schedulerInterval =
+      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+    // must be <= timeoutInterval / 2.
+    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
 
     val t = new Thread {
       override def run() {
         while (userThread.isAlive) {
           checkNumExecutorsFailed()
           allocateMissingExecutor()
-          sendProgress()
-          Thread.sleep(sleepTime)
+          logDebug("Sending progress")
+          yarnAllocator.allocateResources()
+          Thread.sleep(interval)
         }
       }
     }
     // Setting to daemon status, though this is usually not a good idea.
     t.setDaemon(true)
     t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    logInfo("Started progress reporter thread - heartbeat interval : " + 
interval)
     t
   }
 
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // Simulated with an allocate request with no nodes requested.
-    yarnAllocator.allocateResources()
-  }
-
-  /*
-  def printContainers(containers: List[Container]) = {
-    for (container <- containers) {
-      logInfo("Launching shell command on a new container."
-        + ", containerId=" + container.getId()
-        + ", containerNode=" + container.getNodeId().getHost()
-        + ":" + container.getNodeId().getPort()
-        + ", containerNodeURI=" + container.getNodeHttpAddress()
-        + ", containerState" + container.getState()
-        + ", containerResourceMemory"
-        + container.getResource().getMemory())
-    }
-  }
-  */
-
   def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: 
String = "") {
     synchronized {
       if (isFinished) {
@@ -351,7 +320,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       }
       isFinished = true
 
-      logInfo("finishApplicationMaster with " + status)
+      logInfo("Unregistering ApplicationMaster with " + status)
       if (registered) {
         val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
         amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl)
@@ -386,7 +355,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
 
     def run() {
       logInfo("AppMaster received a signal.")
-      // we need to clean up staging dir before HDFS is shut down
+      // We need to clean up staging dir before HDFS is shut down
       // make sure we don't delete it until this is the last AM
       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
     }
@@ -401,21 +370,24 @@ object ApplicationMaster {
   // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
   // optimal as more containers are available. Might need to handle this 
better.
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
   private val applicationMasters = new 
CopyOnWriteArrayList[ApplicationMaster]()
 
   val sparkContextRef: AtomicReference[SparkContext] =
-    new AtomicReference[SparkContext](null /* initialValue */)
+    new AtomicReference[SparkContext](null)
 
-  val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+  // Variable used to notify the YarnClusterScheduler that it should stop 
waiting
+  // for the initial set of executors to be started and get on with its 
business.
+  val doneWithInitialAllocationsMonitor = new Object()
 
-  def incrementAllocatorLoop(by: Int) {
-    val count = yarnAllocatorLoop.getAndAdd(by)
-    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
-      yarnAllocatorLoop.synchronized {
-        // to wake threads off wait ...
-        yarnAllocatorLoop.notifyAll()
-      }
+  @volatile var isDoneWithInitialAllocations = false
+
+  def doneWithInitialAllocations() {
+    isDoneWithInitialAllocations = true
+    doneWithInitialAllocationsMonitor.synchronized {
+      // to wake threads off wait ...
+      doneWithInitialAllocationsMonitor.notifyAll()
     }
   }
 
@@ -423,7 +395,10 @@ object ApplicationMaster {
     applicationMasters.add(master)
   }
 
-  // TODO(harvey): See whether this should be discarded - it isn't used 
anywhere atm...
+  /**
+   * Called from YarnClusterScheduler to notify the AM code that a 
SparkContext has been
+   * initialized in the user code.
+   */
   def sparkContextInitialized(sc: SparkContext): Boolean = {
     var modified = false
     sparkContextRef.synchronized {
@@ -431,7 +406,7 @@ object ApplicationMaster {
       sparkContextRef.notifyAll()
     }
 
-    // Add a shutdown hook - as a best case effort in case users do not call 
sc.stop or do
+    // Add a shutdown hook - as a best effort in case users do not call 
sc.stop or do
     // System.exit.
     // Should not really have to do this, but it helps YARN to evict resources 
earlier.
     // Not to mention, prevent the Client from declaring failure even though 
we exited properly.
@@ -454,13 +429,29 @@ object ApplicationMaster {
       })
     }
 
-    // Wait for initialization to complete and atleast 'some' nodes can get 
allocated.
-    yarnAllocatorLoop.synchronized {
-      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
-        yarnAllocatorLoop.wait(1000L)
+    // Wait for initialization to complete and at least 'some' nodes to get 
allocated.
+    modified
+  }
+
+  /**
+   * Returns when we've either
+   *  1) received all the requested executors,
+   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
+   *  3) hit an error that causes us to terminate trying to get containers.
+   */
+  def waitForInitialAllocations() {
+    doneWithInitialAllocationsMonitor.synchronized {
+      while (!isDoneWithInitialAllocations) {
+        doneWithInitialAllocationsMonitor.wait(1000L)
       }
     }
-    modified
+  }
+
+  def getApplicationAttemptId(): ApplicationAttemptId = {
+    val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    appAttemptId
   }
 
   def main(argStrings: Array[String]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4225dd/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 393edd1..2402761 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,14 +21,12 @@ import java.nio.ByteBuffer
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SparkConf}
 
@@ -102,7 +100,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
 
   def logClusterResourceDetails() {
     val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
-    logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of 
NodeManagers: " +
+    logInfo("Got Cluster metric info from ResourceManager, number of 
NodeManagers: " +
       clusterMetrics.getNumNodeManagers)
 
     val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue)
@@ -133,7 +131,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
 
   def submitApp(appContext: ApplicationSubmissionContext) = {
     // Submit the application to the applications manager.
-    logInfo("Submitting application to ASM")
+    logInfo("Submitting application to ResourceManager")
     yarnClient.submitApplication(appContext)
   }
 
@@ -149,7 +147,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
       Thread.sleep(interval)
       val report = yarnClient.getApplicationReport(appId)
 
-      logInfo("Application report from ASM: \n" +
+      logInfo("Application report from ResourceManager: \n" +
         "\t application identifier: " + appId.toString() + "\n" +
         "\t appId: " + appId.getId() + "\n" +
         "\t clientToAMToken: " + report.getClientToAMToken() + "\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4225dd/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index d93e5bb..4f8854a 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -72,8 +72,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
     override def preStart() {
       logInfo("Listen to driver: " + driverUrl)
       driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established,
-      // thus we can monitor Lifecycle Events.
+      // Send a hello message to establish the connection, after which
+      // we can monitor Lifecycle Events.
       driver ! "Hello"
       context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
     }
@@ -95,7 +95,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
     amClient.init(yarnConf)
     amClient.start()
 
-    appAttemptId = getApplicationAttemptId()
+    appAttemptId = ApplicationMaster.getApplicationAttemptId()
     registerApplicationMaster()
 
     waitForSparkMaster()
@@ -141,18 +141,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
     }
   } 
 
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = 
envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = 
{
     logInfo("Registering the ApplicationMaster")
-    // TODO:(Raymond) Find out Spark UI address and fill in here?
+    // TODO: Find out client's Spark UI address and fill in here?
     amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
   }
 
@@ -185,8 +176,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
 
 
   private def allocateExecutors() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just 
fake a empty one for now.
+    // TODO: should get preferredNodeLocationData from SparkContext, just fake 
a empty one for now.
     val preferredNodeLocationData: scala.collection.Map[String, 
scala.collection.Set[SplitInfo]] =
       scala.collection.immutable.Map()
 
@@ -198,8 +188,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
       preferredNodeLocationData,
       sparkConf)
 
-    logInfo("Allocating " + args.numExecutors + " executors.")
-    // Wait until all containers have finished
+    logInfo("Requesting " + args.numExecutors + " executors.")
+    // Wait until all containers have launched
     yarnAllocator.addResourceRequests(args.numExecutors)
     yarnAllocator.allocateResources()
     while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && 
(!driverClosed)) {
@@ -221,7 +211,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
     }
   }
 
-  // TODO: We might want to extend this to allocate more containers in case 
they die !
   private def launchReporterThread(_sleepTime: Long): Thread = {
     val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
@@ -229,7 +218,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
       override def run() {
         while (!driverClosed) {
           allocateMissingExecutor()
-          sendProgress()
+          logDebug("Sending progress")
+          yarnAllocator.allocateResources()
           Thread.sleep(sleepTime)
         }
       }
@@ -241,20 +231,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, 
conf: Configuration, sp
     t
   }
 
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateResources()
-  }
-
   def finishApplicationMaster(status: FinalApplicationStatus) {
-    logInfo("finish ApplicationMaster with " + status)
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* 
appTrackingUrl */)
+    logInfo("Unregistering ApplicationMaster with " + status)
+    val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , 
trackingUrl)
   }
 
 }
 
-
 object ExecutorLauncher {
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)

Reply via email to