Repository: spark
Updated Branches:
  refs/heads/master 67fca189c -> bce0897bc


[SPARK-2051]In yarn.ClientBase spark.yarn.dist.* do not work

Author: witgo <wi...@qq.com>

Closes #969 from witgo/yarn_ClientBase and squashes the following commits:

8117765 [witgo] review commit
3bdbc52 [witgo] Merge branch 'master' of https://github.com/apache/spark into 
yarn_ClientBase
5261b6c [witgo] fix sys.props.get("SPARK_YARN_DIST_FILES")
e3c1107 [witgo] update docs
b6a9aa1 [witgo] merge master
c8b4554 [witgo] review commit
2f48789 [witgo] Merge branch 'master' of https://github.com/apache/spark into 
yarn_ClientBase
8d7b82f [witgo] Merge branch 'master' of https://github.com/apache/spark into 
yarn_ClientBase
1048549 [witgo] remove Utils.resolveURIs
871f1db [witgo] add spark.yarn.dist.* documentation
41bce59 [witgo] review commit
35d6fa0 [witgo] move to ClientArguments
55d72fc [witgo] Merge branch 'master' of https://github.com/apache/spark into 
yarn_ClientBase
9cdff16 [witgo] review commit
8bc2f4b [witgo] review commit
20e667c [witgo] Merge branch 'master' into yarn_ClientBase
0961151 [witgo] merge master
ce609fc [witgo] Merge branch 'master' into yarn_ClientBase
8362489 [witgo] yarn.ClientBase spark.yarn.dist.* do not work


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bce0897b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bce0897b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bce0897b

Branch: refs/heads/master
Commit: bce0897bc6b0fc9bca5444dbe3a9e75523ad7481
Parents: 67fca18
Author: witgo <wi...@qq.com>
Authored: Thu Jun 19 12:11:26 2014 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Thu Jun 19 12:11:26 2014 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                         | 20 +++++++++++++++++---
 .../spark/deploy/yarn/ClientArguments.scala     | 15 +++++++++++++--
 .../apache/spark/deploy/yarn/ClientBase.scala   |  3 ++-
 .../cluster/YarnClientSchedulerBackend.scala    |  4 +---
 4 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bce0897b/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4243ef4..fecd8f2 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -68,15 +68,29 @@ Most of the configs are the same for Spark on YARN as for 
other deployment modes
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.executor.memoryOverhead</code></td>
-  <td>384</code></td>
+  <td><code>spark.yarn.dist.archives</code></td>
+  <td>(none)</td>
+  <td>
+    Comma separated list of archives to be extracted into the working 
directory of each executor.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.dist.files</code></td>
+  <td>(none)</td>
+  <td>
+    Comma-separated list of files to be placed in the working directory of 
each executor.
+  <td>
+</tr>
+<tr>
+ <td><code>spark.yarn.executor.memoryOverhead</code></td>
+  <td>384</td>
   <td>
     The amount of off heap memory (in megabytes) to be allocated per executor. 
This is memory that accounts for things like VM overheads, interned strings, 
other native overheads, etc.
   </td>
 </tr>
 <tr>
   <td><code>spark.yarn.driver.memoryOverhead</code></td>
-  <td>384</code></td>
+  <td>384</td>
   <td>
     The amount of off heap memory (in megabytes) to be allocated per driver. 
This is memory that accounts for things like VM overheads, interned strings, 
other native overheads, etc.
   </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/bce0897b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index fd3ef9e..62f9b3cf 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.InputFormatInfo
-import org.apache.spark.util.IntParam
-import org.apache.spark.util.MemoryParam
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are 
location aware !
@@ -45,6 +44,18 @@ class ClientArguments(val args: Array[String], val 
sparkConf: SparkConf) {
 
   parseArgs(args.toList)
 
+  // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in 
yarn-client then
+  // it should default to hdfs://
+  files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
+  archives = 
Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
+
+  // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if 
not specified,
+  // for both yarn-client and yarn-cluster
+  files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
+    map(p => Utils.resolveURIs(p)).orNull)
+  archives = 
Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
+    map(p => Utils.resolveURIs(p)).orNull)
+
   private def parseArgs(inputArgs: List[String]): Unit = {
     val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
     val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, 
InputFormatInfo]()

http://git-wip-us.apache.org/repos/asf/spark/blob/bce0897b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 858bcaa..8f22675 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -162,7 +162,7 @@ trait ClientBase extends Logging {
     val fs = FileSystem.get(conf)
     val remoteFs = originalPath.getFileSystem(conf)
     var newPath = originalPath
-    if (! compareFs(remoteFs, fs)) {
+    if (!compareFs(remoteFs, fs)) {
       newPath = new Path(dstDir, originalPath.getName())
       logInfo("Uploading " + originalPath + " to " + newPath)
       FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
@@ -250,6 +250,7 @@ trait ClientBase extends Logging {
         }
       }
     }
+    logInfo("Prepared Local resources " + localResources)
     sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, 
cachedSecondaryJarLinks.mkString(","))
 
     UserGroupInformation.getCurrentUser().addCredentials(credentials)

http://git-wip-us.apache.org/repos/asf/spark/blob/bce0897b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 039cf4f..412dfe3 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -70,9 +70,7 @@ private[spark] class YarnClientSchedulerBackend(
       ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
       ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
       ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
-      ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"),
-      ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
-      ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"))
+      ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
     .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, 
sysProp, argsArrayBuf) }
 
     logDebug("ClientArguments called with: " + argsArrayBuf)

Reply via email to