Repository: spark
Updated Branches:
  refs/heads/master c2f0821aa -> ebff7327a


[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH

Based on https://github.com/apache/spark/pull/5478 that provide a 
PYSPARK_ARCHIVES_PATH env. within this PR, we just should export 
PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip
 in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run 
python application successfully on yarn-client and yarn-cluster with this PR.
andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks.

Author: Lianhui Wang <lianhuiwan...@gmail.com>

Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits:

66ffa43 [Lianhui Wang] Update Client.scala
c2ad0f9 [Lianhui Wang] Update Client.scala
1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' 
into SPARK-6869
008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' 
into SPARK-6869
f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' 
into SPARK-6869
150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' 
into SPARK-6869
20402cd [Lianhui Wang] use ZipEntry
9d87c3f [Lianhui Wang] update scala style
e7bd971 [Lianhui Wang] address vanzin's comments
4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt
e6b573b [Lianhui Wang] address vanzin's comments
f11f84a [Lianhui Wang] zip pyspark archives
5192cca [Lianhui Wang] update import path
3b1e4c8 [Lianhui Wang] address tgravescs's comments
9396346 [Lianhui Wang] put zip to make-distribution.sh
0d2baf7 [Lianhui Wang] update import paths
e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit
31e8e06 [Lianhui Wang] update code style
9f31dac [Lianhui Wang] update code and add comments
f72987c [Lianhui Wang] add archives path to PYTHONPATH


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebff7327
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebff7327
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebff7327

Branch: refs/heads/master
Commit: ebff7327af5efa9f57c605284de4fae6b050ae0f
Parents: c2f0821
Author: Lianhui Wang <lianhuiwan...@gmail.com>
Authored: Fri May 8 08:44:46 2015 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Fri May 8 08:44:46 2015 -0500

----------------------------------------------------------------------
 assembly/pom.xml                                | 21 ++++++++++
 .../org/apache/spark/deploy/SparkSubmit.scala   | 41 ++++++++++++++++++++
 project/SparkBuild.scala                        | 37 +++++++++++++++++-
 .../org/apache/spark/deploy/yarn/Client.scala   | 23 ++++++++---
 4 files changed, 114 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ebff7327/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 2b4d0a9..626c857 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -92,6 +92,27 @@
           <skip>true</skip>
         </configuration>
       </plugin>
+        <!-- zip pyspark archives to run python application on yarn mode -->
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                  <goals>
+                    <goal>run</goal>
+                  </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <target>
+                <delete dir="${basedir}/../python/lib/pyspark.zip"/>
+                <zip destfile="${basedir}/../python/lib/pyspark.zip">
+                  <fileset dir="${basedir}/../python/" 
includes="pyspark/**/*"/>
+                </zip>
+              </target>
+            </configuration>
+        </plugin>
       <!-- Use the shade plugin to create a big JAR with all the dependencies 
-->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/ebff7327/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 8a03279..329fa06 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -332,6 +332,47 @@ object SparkSubmit {
       }
     }
 
+    // In yarn mode for a python app, add pyspark archives to files
+    // that can be distributed with the job
+    if (args.isPython && clusterManager == YARN) {
+      var pyArchives: String = null
+      val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
+      if (pyArchivesEnvOpt.isDefined) {
+        pyArchives = pyArchivesEnvOpt.get
+      } else {
+        if (!sys.env.contains("SPARK_HOME")) {
+          printErrorAndExit("SPARK_HOME does not exist for python application 
in yarn mode.")
+        }
+        val pythonPath = new ArrayBuffer[String]
+        for (sparkHome <- sys.env.get("SPARK_HOME")) {
+          val pyLibPath = Seq(sparkHome, "python", 
"lib").mkString(File.separator)
+          val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+          if (!pyArchivesFile.exists()) {
+            printErrorAndExit("pyspark.zip does not exist for python 
application in yarn mode.")
+          }
+          val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+          if (!py4jFile.exists()) {
+            printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python 
application " +
+              "in yarn mode.")
+          }
+          pythonPath += pyArchivesFile.getAbsolutePath()
+          pythonPath += py4jFile.getAbsolutePath()
+        }
+        pyArchives = pythonPath.mkString(",")
+      }
+
+      pyArchives = pyArchives.split(",").map { localPath=>
+        val localURI = Utils.resolveURI(localPath)
+        if (localURI.getScheme != "local") {
+          args.files = mergeFileLists(args.files, localURI.toString)
+          new Path(localPath).getName
+        } else {
+          localURI.getPath
+        }
+      }.mkString(File.pathSeparator)
+      sysProps("spark.submit.pyArchives") = pyArchives
+    }
+
     // If we're running a R app, set the main class to our specific R runner
     if (args.isR && deployMode == CLIENT) {
       if (args.primaryResource == SPARKR_SHELL) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ebff7327/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 026855f..186345a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -370,6 +370,7 @@ object Assembly {
 object PySparkAssembly {
   import sbtassembly.Plugin._
   import AssemblyKeys._
+  import java.util.zip.{ZipOutputStream, ZipEntry}
 
   lazy val settings = Seq(
     unmanagedJars in Compile += { BuildCommons.sparkHome / 
"python/lib/py4j-0.8.2.1-src.zip" },
@@ -377,16 +378,48 @@ object PySparkAssembly {
     // to be included in the assembly. We can't just add "python/" to the 
assembly's resource dir
     // list since that will copy unneeded / unwanted files.
     resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: 
File =>
+      val src = new File(BuildCommons.sparkHome, "python/pyspark")
+
+      val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
+      zipFile.delete()
+      zipRecursive(src, zipFile)
+
       val dst = new File(outDir, "pyspark")
       if (!dst.isDirectory()) {
         require(dst.mkdirs())
       }
-
-      val src = new File(BuildCommons.sparkHome, "python/pyspark")
       copy(src, dst)
     }
   )
 
+  private def zipRecursive(source: File, destZipFile: File) = {
+    val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
+    addFilesToZipStream("", source, destOutput)
+    destOutput.flush()
+    destOutput.close()
+  }
+
+  private def addFilesToZipStream(parent: String, source: File, output: 
ZipOutputStream): Unit = {
+    if (source.isDirectory()) {
+      output.putNextEntry(new ZipEntry(parent + source.getName()))
+      for (file <- source.listFiles()) {
+        addFilesToZipStream(parent + source.getName() + File.separator, file, 
output)
+      }
+    } else {
+      val in = new FileInputStream(source)
+      output.putNextEntry(new ZipEntry(parent + source.getName()))
+      val buf = new Array[Byte](8192)
+      var n = 0
+      while (n != -1) {
+        n = in.read(buf)
+        if (n != -1) {
+          output.write(buf, 0, n)
+        }
+      }
+      in.close()
+    }
+  }
+
   private def copy(src: File, dst: File): Seq[File] = {
     src.listFiles().flatMap { f =>
       val child = new File(dst, f.getName())

http://git-wip-us.apache.org/repos/asf/spark/blob/ebff7327/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 20ecaf0..d21a739 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -468,6 +468,17 @@ private[spark] class Client(
       env("SPARK_YARN_USER_ENV") = userEnvs
     }
 
+    // if spark.submit.pyArchives is in sparkConf, append pyArchives to 
PYTHONPATH
+    // that can be passed on to the ApplicationMaster and the executors.
+    if (sparkConf.contains("spark.submit.pyArchives")) {
+      var pythonPath = sparkConf.get("spark.submit.pyArchives")
+      if (env.contains("PYTHONPATH")) {
+        pythonPath = Seq(env.get("PYTHONPATH"), 
pythonPath).mkString(File.pathSeparator)
+      }
+      env("PYTHONPATH") = pythonPath
+      sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
+    }
+
     // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to 
propagate it to
     // executors. But we can't just set spark.executor.extraJavaOptions, 
because the driver's
     // SparkContext will not let that set spark* system properties, which is 
expected behavior for
@@ -1074,7 +1085,7 @@ object Client extends Logging {
         val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
         val hiveConfClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
 
-        val hiveConfGet = (param:String) => Option(hiveConfClass
+        val hiveConfGet = (param: String) => Option(hiveConfClass
           .getMethod("get", classOf[java.lang.String])
           .invoke(hiveConf, param))
 
@@ -1096,7 +1107,7 @@ object Client extends Logging {
 
             val hive2Token = new Token[DelegationTokenIdentifier]()
             hive2Token.decodeFromUrlString(tokenStr)
-            credentials.addToken(new 
Text("hive.server2.delegation.token"),hive2Token)
+            credentials.addToken(new Text("hive.server2.delegation.token"), 
hive2Token)
             logDebug("Added hive.Server2.delegation.token to conf.")
             hiveClass.getMethod("closeCurrent").invoke(null)
           } else {
@@ -1141,13 +1152,13 @@ object Client extends Logging {
 
         logInfo("Added HBase security token to credentials.")
       } catch {
-        case e:java.lang.NoSuchMethodException =>
+        case e: java.lang.NoSuchMethodException =>
           logInfo("HBase Method not found: " + e)
-        case e:java.lang.ClassNotFoundException =>
+        case e: java.lang.ClassNotFoundException =>
           logDebug("HBase Class not found: " + e)
-        case e:java.lang.NoClassDefFoundError =>
+        case e: java.lang.NoClassDefFoundError =>
           logDebug("HBase Class not found: " + e)
-        case e:Exception =>
+        case e: Exception =>
           logError("Exception when obtaining HBase security token: " + e)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to