Updated Branches:
  refs/heads/master 7b58f116e -> 7cef8435d

yarn-client addJar fix and misc other


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c617083e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c617083e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c617083e

Branch: refs/heads/master
Commit: c617083e478e3cfbddc4232060aa7b7a0c5812d4
Parents: 365cac9
Author: Thomas Graves <tgra...@apache.org>
Authored: Thu Jan 9 09:53:51 2014 -0600
Committer: Thomas Graves <tgra...@apache.org>
Committed: Thu Jan 9 10:24:35 2014 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  8 ++--
 docs/running-on-yarn.md                         | 15 +++++-
 .../spark/deploy/yarn/WorkerLauncher.scala      | 29 ++++++++++--
 .../cluster/YarnClientSchedulerBackend.scala    | 50 +++++++++++---------
 .../spark/deploy/yarn/WorkerLauncher.scala      | 29 ++++++++++--
 5 files changed, 94 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c617083e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fce8f2d..f1695c9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -669,10 +669,10 @@ class SparkContext(
         key = uri.getScheme match {
           // A JAR file which exists only on the driver node
           case null | "file" =>
-            if (SparkHadoopUtil.get.isYarnMode()) {
-              // In order for this to work on yarn the user must specify the 
--addjars option to
-              // the client to upload the file into the distributed cache to 
make it show up in the
-              // current working directory.
+            if (SparkHadoopUtil.get.isYarnMode() && master == 
"yarn-standalone") {
+              // In order for this to work in yarn standalone mode the user 
must specify the 
+              // --addjars option to the client to upload the file into the 
distributed cache 
+              // of the AM to make it show up in the current working directory.
               val fileName = new Path(uri.getPath).getName()
               try {
                 env.httpFileServer.addJar(new File(fileName))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c617083e/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index b206270..3bd6264 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -101,7 +101,19 @@ With this mode, your application is actually run on the 
remote machine where the
 
 With yarn-client mode, the application will be launched locally. Just like 
running application or spark-shell on Local / Mesos / Standalone mode. The 
launch method is also the similar with them, just make sure that when you need 
to specify a master url, use "yarn-client" instead. And you also need to export 
the env value for SPARK_JAR and SPARK_YARN_APP_JAR
 
-In order to tune worker core/number/memory etc. You need to export 
SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by 
./conf/spark-env.sh
+Configuration in yarn-client mode:
+
+In order to tune worker core/number/memory etc. You need to export environment 
variables or add them to the spark configuration file (./conf/spark_env.sh). 
The following are the list of options.
+
+* `SPARK_YARN_APP_JAR`, Path to your application's JAR file (required)
+* `SPARK_WORKER_INSTANCES`, Number of workers to start (Default: 2)
+* `SPARK_WORKER_CORES`, Number of cores for the workers (Default: 1).
+* `SPARK_WORKER_MEMORY`, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
+* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
+* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
+* `SPARK_YARN_QUEUE`, The hadoop queue to use for allocation requests 
(Default: 'default')
+* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed 
with the job.
+* `SPARK_YARN_DIST_ARCHIVES`, Comma separated list of archives to be 
distributed with the job.
 
 For example:
 
@@ -114,7 +126,6 @@ For example:
     
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar
 \
     MASTER=yarn-client ./bin/spark-shell
 
-You can also send extra files to yarn cluster for worker to use by exporting 
SPARK_YARN_DIST_FILES=file1,file2... etc.
 
 # Building Spark for Hadoop/YARN 2.2.x
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c617083e/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index ddfec1a..66e38ee 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: 
Configuration, spar
 
   def run() {
 
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
     val appMasterResponse: RegisterApplicationMasterResponse = 
registerApplicationMaster()
@@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
     // ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
     val timeoutInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // must be <= timeoutInterval/ 2.
-    // On other hand, also ensure that we are reasonably responsive without 
causing too many requests to RM.
-    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 
60000L))
+    // we want to be reasonably responsive without causing too many requests 
to RM.
+    val schedulerInterval =
+      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", 
"5000").toLong
+    // must be <= timeoutInterval / 2.
+    val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -119,6 +125,21 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
     System.exit(0)
   }
 
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+      .getOrElse(""))
+ 
+    if (localDirs.isEmpty()) {
+      throw new Exception("Yarn Local dirs can't be empty")
+    }
+    localDirs
+  }
+
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
     val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c617083e/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b1b5da..22e55e0 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -22,6 +22,8 @@ import org.apache.spark.{SparkException, Logging, 
SparkContext}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
+import scala.collection.mutable.ArrayBuffer
+
 private[spark] class YarnClientSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext)
@@ -31,45 +33,47 @@ private[spark] class YarnClientSchedulerBackend(
   var client: Client = null
   var appId: ApplicationId = null
 
+  private[spark] def addArg(optionName: String, optionalParam: String, 
arrayBuf: ArrayBuffer[String]) {
+    Option(System.getenv(optionalParam)) foreach {
+      optParam => {
+        arrayBuf += (optionName, optParam)
+      }
+    }
+  }
+
   override def start() {
     super.start()
 
-    val defalutWorkerCores = "2"
-    val defalutWorkerMemory = "512m"
-    val defaultWorkerNumber = "1"
-
     val userJar = System.getenv("SPARK_YARN_APP_JAR")
-    val distFiles = System.getenv("SPARK_YARN_DIST_FILES")
-    var workerCores = System.getenv("SPARK_WORKER_CORES")
-    var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
-    var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
-
     if (userJar == null)
       throw new SparkException("env SPARK_YARN_APP_JAR is not set")
 
-    if (workerCores == null)
-      workerCores = defalutWorkerCores
-    if (workerMemory == null)
-      workerMemory = defalutWorkerMemory
-    if (workerNumber == null)
-      workerNumber = defaultWorkerNumber
-
     val driverHost = conf.get("spark.driver.host")
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
 
-    val argsArray = Array[String](
+    val argsArrayBuf = new ArrayBuffer[String]()
+    argsArrayBuf += (
       "--class", "notused",
       "--jar", userJar,
       "--args", hostport,
-      "--worker-memory", workerMemory,
-      "--worker-cores", workerCores,
-      "--num-workers", workerNumber,
-      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher",
-      "--files", distFiles
+      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
     )
 
-    val args = new ClientArguments(argsArray, conf)
+    // process any optional arguments, use the defaults already defined in 
ClientArguments 
+    // if things aren't specified
+    Map("--master-memory" -> "SPARK_MASTER_MEMORY",
+      "--num-workers" -> "SPARK_WORKER_INSTANCES",
+      "--worker-memory" -> "SPARK_WORKER_MEMORY",
+      "--worker-cores" -> "SPARK_WORKER_CORES",
+      "--queue" -> "SPARK_YARN_QUEUE",
+      "--name" -> "SPARK_YARN_APP_NAME",
+      "--files" -> "SPARK_YARN_DIST_FILES",
+      "--archives" -> "SPARK_YARN_DIST_ARCHIVES")
+    .foreach { case (optName, optParam) => addArg(optName, optParam, 
argsArrayBuf) }
+      
+    logDebug("ClientArguments called with: " + argsArrayBuf)
+    val args = new ClientArguments(argsArrayBuf.toArray, conf)
     client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c617083e/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 49248a8..3e3a467 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -78,6 +78,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: 
Configuration, spar
 
   def run() {
 
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
+    System.setProperty("spark.local.dir", getLocalDirs())
+
     amClient = AMRMClient.createAMRMClient()
     amClient.init(yarnConf)
     amClient.start()
@@ -94,10 +98,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
     // ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
     val timeoutInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // must be <= timeoutInterval/ 2.
-    // On other hand, also ensure that we are reasonably responsive without 
causing too many requests to RM.
-    // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 
10, 60000L))
+    // we want to be reasonably responsive without causing too many requests 
to RM.
+    val schedulerInterval =
+      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", 
"5000").toLong
+    // must be <= timeoutInterval / 2.
+    val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -110,6 +116,21 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
     System.exit(0)
   }
 
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = { 
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+      .getOrElse(""))
+ 
+    if (localDirs.isEmpty()) { 
+      throw new Exception("Yarn Local dirs can't be empty")
+    } 
+    localDirs
+  } 
+
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
     val containerIdString = 
envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())

Reply via email to