[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-07-28 Thread mstreuhofer
Github user mstreuhofer commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r205951170
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-07-28 Thread mstreuhofer
Github user mstreuhofer commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r205950127
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
--- End diff --

the temporary directory is not being deleted on exit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195919967
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195920245
  
--- Diff: python/pyspark/context.py ---
@@ -1035,6 +1044,46 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+@since(2.4)
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
--- End diff --

This will only be the case if in Kubernetes you specify the `spark-py` 
image. So this will be need to be expanded per cluster-manager. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195919580
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
--- End diff --

In addition how are we handling the case for an existing 
`s"$virtualEnvBinPath/$virtualEnvName"`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195919982
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195919958
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195919848
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
--- End diff --

In Kubernetes world, I might want to use a `requirements.txt` file that is 
stored locally in the base docker image, regardless of client or cluster mode. 
Is that something that you think should be supported? Maybe a config variable 
`spark.pyspark.virtualenv.kubernetes.localrequirements` that points to a file 
stored as 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195920107
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala ---
@@ -39,12 +39,17 @@ object PythonRunner {
 val pyFiles = args(1)
 val otherArgs = args.slice(2, args.length)
 val sparkConf = new SparkConf()
-val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
   .orElse(sparkConf.get(PYSPARK_PYTHON))
   .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
   .orElse(sys.env.get("PYSPARK_PYTHON"))
   .getOrElse("python")
 
+if (sparkConf.getBoolean("spark.pyspark.virtualenv.enabled", false)) {
+  val virtualEnvFactory = new VirtualEnvFactory(pythonExec, sparkConf, 
true)
+  pythonExec = virtualEnvFactory.setupVirtualEnv()
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195920034
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195919641
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
--- End diff --

In the case of Kubernetes:

This will be created in the base spark-py Docker image, which is shared 
between the driver and executors and the containers will be cleaned up upon 
termination of the job via owner-labels (for the executor) and the k8s 
API-Server (for the driver).

As such, (hopefully with client-mode support being completed soon), the 
below logic should hold as well. 

Is this work going to be cluster-manage agnostic? Or is this supposed to 
only support Yarn? I would like to see this be applicable to all first-class 
cluster-management systems. 

I can help with appending to this PR: k8s Support and the appropriate 
integration tests. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195920098
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
--- End diff --

Might be better to pass args into this function so that is could be 
properly unit-tested. It seems that there are no unit-tests for this class, so 
that seems to be a necessary addition. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-17 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r195920112
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// for pyspark virtualenv
+if (args.isPython) {
+  if (clusterManager != YARN &&
+args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", 
"false") == "true") {
+printErrorAndExit("virtualenv is only supported in yarn mode")
--- End diff --

+1 for Kubernetes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193674158
  
--- Diff: python/pyspark/context.py ---
@@ -1035,6 +1044,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
--- End diff --

Shall we add:

```
.. versionadded:: 2.3.0
.. note:: Experimental
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193673500
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark (This is an experimental feature and may evolve 
in future version)
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+first to be installed. In this case `--py-files` is not so convenient. 
Luckily, in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). User can use this feature
+in 2 scenarios:
+* Batch mode (submit spark app via spark-submit)
+* Interactive mode (PySpark shell or other third party Spark Notebook)
+
--- End diff --

Ah, maybe we can leave a note at the end instead of adding it in the title.

```
Note that this is an experimental feature added from Spark 2.4.0 and may 
evolve in the future version.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193674619
  
--- Diff: python/pyspark/context.py ---
@@ -1035,6 +1044,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
+
+virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
+if virtualenvPackages:
+self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + ":" +
+   ":".join(packages))
+else:
+self._conf.set("spark.pyspark.virtualenv.packages", 
":".join(packages))
+
+import functools
--- End diff --

Can we move this up within this function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193672797
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark (This is an experimental feature and may evolve 
in future version)
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+first to be installed. In this case `--py-files` is not so convenient. 
Luckily, in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). User can use this feature
+in 2 scenarios:
+* Batch mode (submit spark app via spark-submit)
+* Interactive mode (PySpark shell or other third party Spark Notebook)
+
+## Prerequisites
+- Each node have virtualenv/conda, python-devel installed
+- Each node is internet accessible (for downloading packages)
+
+## Batch Mode
+
+In batch mode, user need to specify the additional python packages before 
launching spark app. There're 2 approaches to specify that:
+* Provide a requirement file which contains all the packages for the 
virtualenv.  
+* Specify packages via spark configuration 
`spark.pyspark.virtualenv.packages`.
+
+Here're several examples:
+
+{% highlight bash %}
+### Setup virtualenv using native virtualenv on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=native" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+### Setup virtualenv using conda on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+### Setup virtualenv using conda on yarn-client mode and specify packages 
via `spark.pyspark.virtualenv.packages`
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf "spark.pyspark.virtualenv.packages=numpy,pandas" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+{% endhighlight %}
+
+### How to create requirement file ?
+Usually before running distributed PySpark job, you need first to run it 
in local environment. It is encouraged to first create your own virtualenv for 
your project, so you know what packages you need. After you are confident with 
your work and want to move it to cluster, you can run the following command to 
generate the requirement file for virtualenv and conda.
+- pip freeze > requirements.txt
+- conda list --export  > requirements.txt
+
+## Interactive Mode
+In interactive mode,user can install python packages at runtime instead 
of specifying them in requirement file when submitting spark app.
+Here are several ways to install packages
+
+{% highlight python %}
+sc.install_packages("numpy")   # install the latest numpy
--- End diff --

Seems there are tabs here. Shall we replace them to spaces?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193664416
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark
--- End diff --

Thanks @HyukjinKwon , doc is updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193659778
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark
--- End diff --

@zjffdu, mind if I ask to describe this is an experimental feature and it's 
very likely to be unstable and it's still evolving?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-03-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r175670974
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala ---
@@ -39,12 +39,17 @@ object PythonRunner {
 val pyFiles = args(1)
 val otherArgs = args.slice(2, args.length)
 val sparkConf = new SparkConf()
-val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
   .orElse(sparkConf.get(PYSPARK_PYTHON))
   .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
   .orElse(sys.env.get("PYSPARK_PYTHON"))
   .getOrElse("python")
 
+if (sparkConf.getBoolean("spark.pyspark.virtualenv.enabled", false)) {
+  val virtualEnvFactory = new VirtualEnvFactory(pythonExec, sparkConf, 
true)
+  pythonExec = virtualEnvFactory.setupVirtualEnv()
--- End diff --

correct me if I misunderstood. Couldn't we have some tests to check if 
`setupVirtualEnv` has a proper string at least?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-29 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164646157
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
--- End diff --

It is used by launcher module which doesn't depend on scala. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-26 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164069473
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala ---
@@ -39,12 +39,17 @@ object PythonRunner {
 val pyFiles = args(1)
 val otherArgs = args.slice(2, args.length)
 val sparkConf = new SparkConf()
-val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
--- End diff --

I  am afraid I have to use var, because `VirtualenvFactory` also need 
`pythonExec` as constructor arg.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-26 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164068172
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
--- End diff --

For the existing running executor,  the only way to install additional 
packages is via `sc.intall_packages`.  And `spark.pyspark.virtualenv.packages` 
will be updated on driver side first when `sc.install_packges` is called, then 
new allocated executor will fetch this property and install all the additional 
packages properly. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164037516
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
+
+virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
+if virtualenvPackages:
+self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + "," +
+   ",".join(packages))
+else:
+self._conf.set("spark.pyspark.virtualenv.packages", 
",".join(packages))
--- End diff --

Good catch, I will use other separator. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164037239
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   private val reviveThread =
 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
 
-  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
+  class DriverEndpoint(override val rpcEnv: RpcEnv)
--- End diff --

Without this change, the following scenario won't work. 
1. Launch spark app
2. call `sc.install_packages("numpy")`
3. run `sc.range(3).map(lambda x: np.__version__).collect()`
4. Restart executor (by kill it, scheduler will scheduler another executor)
5. run `sc.range(3).map(lambda x: np.__version___.collect()` again, this 
time it would fail. Because the new scheduled executor can not set up 
virtualenv correctly as it can not get the updated 
`spark.pyspark.virtualenv.packages`.

That's why make this change in core. Now executor would always get the 
updated SparkConf instead of the SparkConf created when spark app is started. 

There's some overhead, but I believe it is very trivial, and could be 
improved later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164037055
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +300,34 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
   conf.get(SparkLauncher.PYSPARK_PYTHON),
   System.getenv("PYSPARK_DRIVER_PYTHON"),
   System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
+  "python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
Class.forName("org.apache.spark.api.python.VirtualEnvFactory");
--- End diff --

Because launcher module does not depend on core module explicitly in pom.xml


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164036488
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand =
  

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164034980
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand =
  

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164034871
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
--- End diff --

The pip install output will be displayed.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164032549
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +300,34 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
   conf.get(SparkLauncher.PYSPARK_PYTHON),
   System.getenv("PYSPARK_DRIVER_PYTHON"),
   System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
+  "python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
Class.forName("org.apache.spark.api.python.VirtualEnvFactory");
--- End diff --

Why are we using reflection here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164030979
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
--- End diff --

So if the factory is made once then how will these get updated?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164031483
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand =
 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164031905
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,42 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
+
+virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
+if virtualenvPackages:
+self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + "," +
+   ",".join(packages))
+else:
+self._conf.set("spark.pyspark.virtualenv.packages", 
",".join(packages))
+
+import functools
+dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
--- End diff --

When have you seen this fail? Just trying to follow along.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164031985
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
--- End diff --

So wait will this fail silently or how will the user know its failed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164032400
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
+
+virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
+if virtualenvPackages:
+self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + "," +
+   ",".join(packages))
+else:
+self._conf.set("spark.pyspark.virtualenv.packages", 
",".join(packages))
--- End diff --

So here and other places you split on "," which isn't great for people who 
want to specify version ranges.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164032982
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   private val reviveThread =
 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
 
-  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
+  class DriverEndpoint(override val rpcEnv: RpcEnv)
--- End diff --

My question here is why is this change needed? Changing the scheduler 
backend is weird for this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164031719
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private val virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private val initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenvBasedir = Files.createTempDir()
+  virtualenvBasedir.deleteOnExit()
+  virtualEnvName = virtualenvBasedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand =
 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-25 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r164032919
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala ---
@@ -39,12 +39,17 @@ object PythonRunner {
 val pyFiles = args(1)
 val otherArgs = args.slice(2, args.length)
 val sparkConf = new SparkConf()
-val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
--- End diff --

Why is this made into a var? You could keep this as a val with a bit of 
refactoring.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163427975
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,42 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
+
+virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
+if virtualenvPackages:
+self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + "," +
+   ",".join(packages))
+else:
+self._conf.set("spark.pyspark.virtualenv.packages", 
",".join(packages))
+
+import functools
+dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
--- End diff --

You are right, it is not guaranteed. From my experiment, it works pretty 
well most of time. And even it is not executed on all executors in this rdd 
operation, packages will be installed when later python daemon is started on 
that executor. So in any case, python packages will be installed in all python 
daemons.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163206632
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
--- End diff --

I guess we can use `boolean.class` in java reflection.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163203430
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenv_basedir = Files.createTempDir()
--- End diff --

nit: `virtualenvBasedir`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163202935
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
--- End diff --

Do we need to have these as an instance variables?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163203798
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   */
+  def setupVirtualEnv(): String = {
+/*
+ *
+ * Native Virtualenv:
+ *   -  Execute command: virtualenv -p  --no-site-packages 

+ *   -  Execute command: python -m pip --cache-dir  install 
-r 
+ *
+ * Conda
+ *   -  Execute command: conda create --prefix  --file 
 -y
+ *
+ */
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: $virtualEnvType is not supported." )
+require(new File(virtualEnvBinPath).exists(),
+  s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't 
exist.")
+// Two scenarios of creating virtualenv:
+// 1. created in yarn container. Yarn will clean it up after container 
is exited
+// 2. created outside yarn container. Spark need to create temp 
directory and clean it after app
+//finish.
+//  - driver of PySpark shell
+//  - driver of yarn-client mode
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenv_basedir = Files.createTempDir()
+  virtualenv_basedir.deleteOnExit()
+  virtualEnvName = virtualenv_basedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pysparkRequirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand 

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163201794
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvBinPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var initPythonPackages = 
conf.getOption("spark.pyspark.virtualenv.packages")
--- End diff --

Use `val`s for these three variables.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163198358
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,42 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
--- End diff --

nit: `Install` instead of `install` at the beginning of the line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163199689
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +300,38 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
   conf.get(SparkLauncher.PYSPARK_PYTHON),
   System.getenv("PYSPARK_DRIVER_PYTHON"),
   System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
+  "python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
+Object virtualEnv = virtualEnvClazz.getConstructor(String.class, 
Map.class, Boolean.class)
+  .newInstance(pythonExec, conf, true);
+Method virtualEnvMethod = 
virtualEnvClazz.getMethod("setupVirtualEnv");
+pythonExec = (String) virtualEnvMethod.invoke(virtualEnv);
+pyargs.add(pythonExec);
+  } catch (Exception e) {
+throw new IOException(e);
+  }
+} else {
+  
pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
+System.getenv("PYSPARK_DRIVER_PYTHON"),
+System.getenv("PYSPARK_PYTHON"),
+"python"));
--- End diff --

We can simplify as `pyargs.add(pythonExec);`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163199169
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,42 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
--- End diff --

What's `install_driver` parameter?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163199771
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +300,38 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
   conf.get(SparkLauncher.PYSPARK_PYTHON),
   System.getenv("PYSPARK_DRIVER_PYTHON"),
   System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
+  "python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
--- End diff --

`Class.forName` instead of `getClass().forName`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163194199
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -82,6 +90,12 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 envVars.getOrElse("PYTHONPATH", ""),
 sys.env.getOrElse("PYTHONPATH", ""))
 
+
+  if (virtualEnvEnabled) {
+val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false)
+virtualenvPythonExec = Some(virtualEnvFactory.setupVirtualEnv())
+  }
--- End diff --

I guess we don't prefer unnecessary `var`s.

How about the following with the diff above:

```scala
val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", 
false)
val virtualenvPythonExec = if (virtualEnvEnabled) {
  val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false)
  Some(virtualEnvFactory.setupVirtualEnv())
} else {
  None
}
```

Or maybe we can:

```scala
val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", 
false)
val virtualenvPythonExec = if (virtualEnvEnabled) {
  val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false)
  virtualEnvFactory.setupVirtualEnv()
} else {
  pythonExec
}
```

and use this directly.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r163197957
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,42 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
+
+virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
+if virtualenvPackages:
+self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + "," +
+   ",".join(packages))
+else:
+self._conf.set("spark.pyspark.virtualenv.packages", 
",".join(packages))
+
+import functools
+dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
--- End diff --

I guess this does not guarantee that the `_run_pip` is executed on all 
executors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-21 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r162856254
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,35 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+pip.main(["install"] + packages)
+
+# run it in the main thread. Will do it in a separated thread after
+# https://github.com/pypa/pip/issues/2553 is fixed
+if install_driver:
+_run_pip(packages, None)
+
+import functools
+dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
--- End diff --

@zjffdu No its it not, hard -1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-09 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160572606
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,35 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+pip.main(["install"] + packages)
+
+# run it in the main thread. Will do it in a separated thread after
+# https://github.com/pypa/pip/issues/2553 is fixed
+if install_driver:
+_run_pip(packages, None)
+
+import functools
+dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
--- End diff --

It make sense to making this feature as experimental. Because although it 
is not reliable in some cases, it is still pretty useful in interactive mode, 
e.g. In notebook, it is not possible to set down all the dependent packages 
before launching spark app. Installing packages at runtime is very useful for 
interactive mode. And since usually notebook is a experimental phase, not 
production phase. These corner cases should be acceptable IMHO as long as we 
document them and make users aware. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-09 Thread Stibbons
Github user Stibbons commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160549913
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,35 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+pip.main(["install"] + packages)
+
+# run it in the main thread. Will do it in a separated thread after
+# https://github.com/pypa/pip/issues/2553 is fixed
+if install_driver:
+_run_pip(packages, None)
+
+import functools
+dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
--- End diff --

what about making this feature experimental and so improving it gradually ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-09 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160521328
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1032,35 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+pip.main(["install"] + packages)
+
+# run it in the main thread. Will do it in a separated thread after
+# https://github.com/pypa/pip/issues/2553 is fixed
+if install_driver:
+_run_pip(packages, None)
+
+import functools
+dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
--- End diff --

This approach is not reliable to executor failur/restart, dynamic 
allocation, and other possible changes. I'm not comfortable merging something 
which depends on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160310618
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1039,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
--- End diff --

@HyukjinKwon Could you guide me how to skip such doctests ? Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160308321
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +301,39 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
-  conf.get(SparkLauncher.PYSPARK_PYTHON),
-  System.getenv("PYSPARK_DRIVER_PYTHON"),
-  System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
-}
-if (!isEmpty(pyOpts)) {
-  pyargs.addAll(parseOptionString(pyOpts));
-}
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
+System.getenv("PYSPARK_DRIVER_PYTHON"),
+System.getenv("PYSPARK_PYTHON"),
+"python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
+Object virtualEnv = virtualEnvClazz.getConstructor(String.class, 
Map.class, Boolean.class)
+.newInstance(pythonExec, conf, true);
+Method virtualEnvMethod = 
virtualEnv.getClass().getMethod("setupVirtualEnv");
+pythonExec = (String) virtualEnvMethod.invoke(virtualEnv);
+pyargs.add(pythonExec);
+  } catch (Exception e) {
+throw new IOException(e);
+  }
+} else {
+  
pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+  conf.get(SparkLauncher.PYSPARK_PYTHON),
+  System.getenv("PYSPARK_DRIVER_PYTHON"),
+  System.getenv("PYSPARK_PYTHON"),
+  "python"));
+  String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+  if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
+// pass conf spark.pyspark.python to python by environment 
variable.
+env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
+  }
 
+  if (!isEmpty(pyOpts)) {
+pyargs.addAll(parseOptionString(pyOpts));
+  }
--- End diff --

Good point, fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160285377
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
--- End diff --

I am afraid not. Because this method is used by module launcher where 
SparkConf is unknown.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160141363
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1039,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
--- End diff --

user can call this method to install package at runtime. e.g. user can 
install package in pyspark shell. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160140451
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,73 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for Pyspark
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+to be installed. In this case `--py-files` is not so convenient. Luckily, 
in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). 
+
+# Prerequisites
+- Each node have virtualenv/conda, python-devel installed
+- Each node is internet accessible (for downloading packages)
+
+{% highlight bash %}
+# Setup virtualenv using native virtualenv on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=native" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+# Setup virtualenv using conda on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf 
"spark.pyspark.virtualenv.requirements=<" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+{% endhighlight %}
+
+## PySpark VirtualEnv Configurations
+
+Property NameDefaultMeaning
+
+  spark.pyspark.virtualenv.enabled
+  false
+  Whether to enable virtualenv
+
+
+  Spark.pyspark.virtualenv.type
+  virtualenv
--- End diff --

I don't have strong preference on `native`. The reason I use native is that 
I already use virtualenv in the all the virtualenv related configuration, just 
don't want to introduce virtualenv here again to avoid confusion. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160139161
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
+  s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't 
exist.")
+// Use a temp directory for virtualenv in the following cases:
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise create the virtualenv folder under the executor working 
directory.
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenv_basedir = Files.createTempDir()
+  virtualenv_basedir.deleteOnExit()
+  virtualEnvName = virtualenv_basedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pyspark_requirements =
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160138782
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
--- End diff --

This is because it will also be called in java side via java reflection. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160138391
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -60,6 +66,12 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 envVars.getOrElse("PYTHONPATH", ""),
 sys.env.getOrElse("PYTHONPATH", ""))
 
+
+  if (virtualEnvEnabled) {
+val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false)
+pythonExec = virtualEnvFactory.setupVirtualEnv()
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160138349
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -29,7 +30,10 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.{RedirectThread, Utils}
 
-private[spark] class PythonWorkerFactory(pythonExec: String, envVars: 
Map[String, String])
+
+private[spark] class PythonWorkerFactory(var pythonExec: String,
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160085499
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,73 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for Pyspark
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+to be installed. In this case `--py-files` is not so convenient. Luckily, 
in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). 
+
+# Prerequisites
+- Each node have virtualenv/conda, python-devel installed
+- Each node is internet accessible (for downloading packages)
+
+{% highlight bash %}
+# Setup virtualenv using native virtualenv on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=native" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+# Setup virtualenv using conda on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf 
"spark.pyspark.virtualenv.requirements=<" \
--- End diff --

nit: remove an extra `<`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160083009
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
+  s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't 
exist.")
+// Use a temp directory for virtualenv in the following cases:
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise create the virtualenv folder under the executor working 
directory.
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenv_basedir = Files.createTempDir()
--- End diff --

nit: `virtualenvBasedir`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160081109
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
--- End diff --

How about `this(pythonExec, new SparkConf().setAll(properties.asScala), 
isDriver)` and removing the following 3 lines so we can make `virtualEnvType` 
and `virtualEnvPath` val?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160088202
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +301,39 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
-  conf.get(SparkLauncher.PYSPARK_PYTHON),
-  System.getenv("PYSPARK_DRIVER_PYTHON"),
-  System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
-}
-if (!isEmpty(pyOpts)) {
-  pyargs.addAll(parseOptionString(pyOpts));
-}
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
+System.getenv("PYSPARK_DRIVER_PYTHON"),
+System.getenv("PYSPARK_PYTHON"),
+"python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
+Object virtualEnv = virtualEnvClazz.getConstructor(String.class, 
Map.class, Boolean.class)
+.newInstance(pythonExec, conf, true);
+Method virtualEnvMethod = 
virtualEnv.getClass().getMethod("setupVirtualEnv");
+pythonExec = (String) virtualEnvMethod.invoke(virtualEnv);
+pyargs.add(pythonExec);
+  } catch (Exception e) {
+throw new IOException(e);
+  }
+} else {
+  
pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+  conf.get(SparkLauncher.PYSPARK_PYTHON),
+  System.getenv("PYSPARK_DRIVER_PYTHON"),
+  System.getenv("PYSPARK_PYTHON"),
+  "python"));
--- End diff --

We can simplify as `pyargs.add(pythonExec);`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160093249
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +301,39 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
-  conf.get(SparkLauncher.PYSPARK_PYTHON),
-  System.getenv("PYSPARK_DRIVER_PYTHON"),
-  System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
-}
-if (!isEmpty(pyOpts)) {
-  pyargs.addAll(parseOptionString(pyOpts));
-}
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
+System.getenv("PYSPARK_DRIVER_PYTHON"),
+System.getenv("PYSPARK_PYTHON"),
+"python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
--- End diff --

nit: `Class.forName(...);`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160087899
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +301,39 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
-  conf.get(SparkLauncher.PYSPARK_PYTHON),
-  System.getenv("PYSPARK_DRIVER_PYTHON"),
-  System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
-}
-if (!isEmpty(pyOpts)) {
-  pyargs.addAll(parseOptionString(pyOpts));
-}
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160091411
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,73 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for Pyspark
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+to be installed. In this case `--py-files` is not so convenient. Luckily, 
in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). 
+
+# Prerequisites
+- Each node have virtualenv/conda, python-devel installed
+- Each node is internet accessible (for downloading packages)
+
+{% highlight bash %}
+# Setup virtualenv using native virtualenv on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=native" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+# Setup virtualenv using conda on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf 
"spark.pyspark.virtualenv.requirements=<" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+{% endhighlight %}
+
+## PySpark VirtualEnv Configurations
+
+Property NameDefaultMeaning
+
+  spark.pyspark.virtualenv.enabled
+  false
+  Whether to enable virtualenv
+
+
+  Spark.pyspark.virtualenv.type
+  virtualenv
--- End diff --

`native` instead of `virtualenv`?

Btw, should we use `native` for the config value to indicate virtualenv? 
I'd prefer `virtualenv` instead.
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160090598
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
+  s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't 
exist.")
+// Use a temp directory for virtualenv in the following cases:
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise create the virtualenv folder under the executor working 
directory.
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenv_basedir = Files.createTempDir()
+  virtualenv_basedir.deleteOnExit()
+  virtualEnvName = virtualenv_basedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pyspark_requirements =
+  if (isLauncher ||
+(isDriver && conf.get("spark.submit.deployMode") == "client")) {
+conf.getOption("spark.pyspark.virtualenv.requirements")
+  } else {
+
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
+  }
+
+val createEnvCommand =
+  if (virtualEnvType == "native") {

[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160079462
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
--- End diff --

Do we need to use `java.lang.Boolean` instead of `Boolean`?
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160083172
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
+  s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't 
exist.")
+// Use a temp directory for virtualenv in the following cases:
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise create the virtualenv folder under the executor working 
directory.
+if (isLauncher ||
+  (isDriver && conf.get("spark.submit.deployMode") == "client")) {
+  val virtualenv_basedir = Files.createTempDir()
+  virtualenv_basedir.deleteOnExit()
+  virtualEnvName = virtualenv_basedir.getAbsolutePath
+} else if (isDriver && conf.get("spark.submit.deployMode") == 
"cluster") {
+  virtualEnvName = "virtualenv_driver"
+} else {
+  // use the working directory of Executor
+  virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+}
+
+// Use the absolute path of requirement file in the following cases
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise just use filename as it would be downloaded to the 
working directory of Executor
+val pyspark_requirements =
--- End diff --

nit: `pysparkRequirements`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160081357
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
+  s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't 
exist.")
--- End diff --

nit: `$virtualEnvPath` instead of `${virtualEnvPath}`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160081322
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
--- End diff --

nit: `$virtualEnvType` instead of `${virtualEnvType}`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160088373
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1039,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
--- End diff --

Who calls this method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160087986
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +301,39 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
-  conf.get(SparkLauncher.PYSPARK_PYTHON),
-  System.getenv("PYSPARK_DRIVER_PYTHON"),
-  System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
-}
-if (!isEmpty(pyOpts)) {
-  pyargs.addAll(parseOptionString(pyOpts));
-}
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
+System.getenv("PYSPARK_DRIVER_PYTHON"),
+System.getenv("PYSPARK_PYTHON"),
+"python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
+Object virtualEnv = virtualEnvClazz.getConstructor(String.class, 
Map.class, Boolean.class)
+.newInstance(pythonExec, conf, true);
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160088297
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +301,39 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
-  conf.get(SparkLauncher.PYSPARK_PYTHON),
-  System.getenv("PYSPARK_DRIVER_PYTHON"),
-  System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
-}
-if (!isEmpty(pyOpts)) {
-  pyargs.addAll(parseOptionString(pyOpts));
-}
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
+System.getenv("PYSPARK_DRIVER_PYTHON"),
+System.getenv("PYSPARK_PYTHON"),
+"python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
+Object virtualEnv = virtualEnvClazz.getConstructor(String.class, 
Map.class, Boolean.class)
+.newInstance(pythonExec, conf, true);
+Method virtualEnvMethod = 
virtualEnv.getClass().getMethod("setupVirtualEnv");
+pythonExec = (String) virtualEnvMethod.invoke(virtualEnv);
+pyargs.add(pythonExec);
+  } catch (Exception e) {
+throw new IOException(e);
+  }
+} else {
+  
pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+  conf.get(SparkLauncher.PYSPARK_PYTHON),
+  System.getenv("PYSPARK_DRIVER_PYTHON"),
+  System.getenv("PYSPARK_PYTHON"),
+  "python"));
+  String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+  if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
+// pass conf spark.pyspark.python to python by environment 
variable.
+env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
+  }
 
+  if (!isEmpty(pyOpts)) {
+pyargs.addAll(parseOptionString(pyOpts));
+  }
--- End diff --

Don't we need `pyOpts` when virtualenv is enabled?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160093389
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -299,20 +301,39 @@
 // 4. environment variable PYSPARK_PYTHON
 // 5. python
 List pyargs = new ArrayList<>();
-pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
-  conf.get(SparkLauncher.PYSPARK_PYTHON),
-  System.getenv("PYSPARK_DRIVER_PYTHON"),
-  System.getenv("PYSPARK_PYTHON"),
-  "python"));
-String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
-  // pass conf spark.pyspark.python to python by environment variable.
-  env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
-}
-if (!isEmpty(pyOpts)) {
-  pyargs.addAll(parseOptionString(pyOpts));
-}
+String pythonExec = 
firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+conf.get(SparkLauncher.PYSPARK_PYTHON),
+System.getenv("PYSPARK_DRIVER_PYTHON"),
+System.getenv("PYSPARK_PYTHON"),
+"python");
+if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", 
"false").equals("true")) {
+  try {
+// setup virtualenv in launcher when virtualenv is enabled in 
pyspark shell
+Class virtualEnvClazz = 
getClass().forName("org.apache.spark.api.python.VirtualEnvFactory");
+Object virtualEnv = virtualEnvClazz.getConstructor(String.class, 
Map.class, Boolean.class)
+.newInstance(pythonExec, conf, true);
+Method virtualEnvMethod = 
virtualEnv.getClass().getMethod("setupVirtualEnv");
--- End diff --

nit: `virtualEnvClazz.getMethod(...)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160085587
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,73 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for Pyspark
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+to be installed. In this case `--py-files` is not so convenient. Luckily, 
in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). 
+
+# Prerequisites
+- Each node have virtualenv/conda, python-devel installed
+- Each node is internet accessible (for downloading packages)
+
+{% highlight bash %}
+# Setup virtualenv using native virtualenv on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=native" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+# Setup virtualenv using conda on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf 
"spark.pyspark.virtualenv.requirements=<" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+{% endhighlight %}
+
+## PySpark VirtualEnv Configurations
+
+Property NameDefaultMeaning
+
+  spark.pyspark.virtualenv.enabled
+  false
+  Whether to enable virtualenv
+
+
+  Spark.pyspark.virtualenv.type
--- End diff --

nit: `spark.pyspark...` instead of `Spark.pyspark...`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160072831
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1039,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise Exception("install_packages can only use called when "
+"spark.pyspark.virtualenv.enabled set as true")
+if isinstance(packages, basestring):
+packages = [packages]
+num_executors = int(self._conf.get("spark.executor.instances"))
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+pip.main(["install"] + packages)
+
+# run it in the main thread. Will do it in a separated thread after
+# https://github.com/pypa/pip/issues/2553 is fixed
+if install_driver:
+import threading
--- End diff --

Sorry, maybe I missed something. Do we need to import this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160072723
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1039,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
--- End diff --

Can we have examples as doctests and skip to show how we use it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160072647
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
--- End diff --

Hey @zjffdu, couldn't we actually test this by checking if the generated 
commands are as expected in strings?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160072684
  
--- Diff: python/pyspark/context.py ---
@@ -1023,6 +1039,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise Exception("install_packages can only use called when "
--- End diff --

How about `Exception` -> `RuntimeError`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160071888
  
--- Diff: python/pyspark/context.py ---
@@ -980,6 +996,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise Exception("install_packages can only use called when "
+"spark.pyspark.virtualenv.enabled set as true")
+if isinstance(packages, basestring):
+packages = [packages]
+num_executors = int(self._conf.get("spark.executor.instances"))
+dummyRDD = self.parallelize(range(num_executors), num_executors)
--- End diff --

Right, even without dynamic execution this depend on us contiuing to do 
uniform distribution of data with parallelize which I don't is gauranteed (and 
we have no test which would catch this breaking).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160071773
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// for pyspark virtualenv
+if (args.isPython) {
+  if (clusterManager != YARN &&
+args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", 
"false") == "true") {
+printErrorAndExit("virtualenv is only supported in yarn mode")
--- End diff --

Ah please make a JIRA to track this then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160070613
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
--- End diff --

Sorry, the variable name is a little misleading, it should be 
`virtualEnvBinPath` which is `spark.pyspark.virtualenv.bin.path`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160070518
  
--- Diff: python/pyspark/context.py ---
@@ -980,6 +996,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise Exception("install_packages can only use called when "
+"spark.pyspark.virtualenv.enabled set as true")
+if isinstance(packages, basestring):
+packages = [packages]
+num_executors = int(self._conf.get("spark.executor.instances"))
+dummyRDD = self.parallelize(range(num_executors), num_executors)
--- End diff --

Right, it would not work when dynamic allocation is enabled. will add 
condition for that 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160070457
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// for pyspark virtualenv
+if (args.isPython) {
+  if (clusterManager != YARN &&
+args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", 
"false") == "true") {
+printErrorAndExit("virtualenv is only supported in yarn mode")
--- End diff --

I haven't tested it in standalone mode, so not guaranteed for that, it is 
on my plan to support it for standalone later. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069581
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
--- End diff --

This `virtualEnvPath` variable is confusing since you append a dynamically 
generated name to it for actual path determination. (If I read this code 
correctly)
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069346
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
--- End diff --

I wouldn't document the commands called in the function doc string.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069731
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
--- End diff --

I'm confused why were testing this path exists instead of trying to create 
it if it doesn't already. It also appears to be ignored if in client mode.
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069473
  
--- Diff: python/pyspark/context.py ---
@@ -189,6 +190,21 @@ def _do_init(self, master, appName, sparkHome, 
pyFiles, environment, batchSize,
 self._jsc.sc().register(self._javaAccumulator)
 
 self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
+if self._conf.get("spark.pyspark.virtualenv.enabled") == "true":
+self.pythonExec = self._conf.get("spark.pyspark.python", 
self.pythonExec)
+requirements = 
self._conf.get("spark.pyspark.virtualenv.requirements")
+virtualEnvBinPath = 
self._conf.get("spark.pyspark.virtualenv.bin.path")
+virtualEnvType = 
self._conf.get("spark.pyspark.virtualenv.type", "native")
+python_version = 
self._conf.get("spark.pyspark.virtualenv.python_version")
+
+if virtualEnvType == "conda" and (requirements is None) and 
python_version is None:
--- End diff --

Could we just assume its the same python version as the version which is 
running context.py?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069334
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+import java.util.{Map => JMap}
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+private[spark] class VirtualEnvFactory(pythonExec: String, conf: 
SparkConf, isDriver: Boolean)
+  extends Logging {
+
+  private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", 
"native")
+  private var virtualEnvPath = 
conf.get("spark.pyspark.virtualenv.bin.path", "")
+  private var virtualEnvName: String = _
+  private var virtualPythonExec: String = _
+  private val VIRTUALENV_ID = new AtomicInteger()
+  private var isLauncher: Boolean = false
+
+  // used by launcher when user want to use virtualenv in pyspark shell. 
Launcher need this class
+  // to create virtualenv for driver.
+  def this(pythonExec: String, properties: JMap[String, String], isDriver: 
java.lang.Boolean) {
+this(pythonExec, new SparkConf(), isDriver)
+properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2))
+virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path")
+this.isLauncher = true
+  }
+
+  /*
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p  --no-site-packages 

+   *   -  Execute command: python -m pip --cache-dir  install 
-r 
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix  --file 
 -y
+   *
+   */
+  def setupVirtualEnv(): String = {
+logInfo("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported." )
+require(new File(virtualEnvPath).exists(),
+  s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't 
exist.")
+// Use a temp directory for virtualenv in the following cases:
+// 1. driver of pyspark shell
+// 2. driver of yarn-client mode
+// otherwise create the virtualenv folder under the executor working 
directory.
--- End diff --

Maybe we can clarify why we do the logic this way?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069409
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+// for pyspark virtualenv
+if (args.isPython) {
+  if (clusterManager != YARN &&
+args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", 
"false") == "true") {
+printErrorAndExit("virtualenv is only supported in yarn mode")
--- End diff --

Why doesn't this work in standalone?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069131
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -29,7 +30,10 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.{RedirectThread, Utils}
 
-private[spark] class PythonWorkerFactory(pythonExec: String, envVars: 
Map[String, String])
+
+private[spark] class PythonWorkerFactory(var pythonExec: String,
--- End diff --

Making pythonExec a var is maybe not ideal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r160069167
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -60,6 +66,12 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 envVars.getOrElse("PYTHONPATH", ""),
 sys.env.getOrElse("PYTHONPATH", ""))
 
+
+  if (virtualEnvEnabled) {
+val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false)
+pythonExec = virtualEnvFactory.setupVirtualEnv()
--- End diff --

Rather than updating a var what about if we created a new val which is set 
to the value passed in if virtualEnvEnabled is false or this new value if its 
true?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-01-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r122558038
  
--- Diff: python/pyspark/context.py ---
@@ -980,6 +996,33 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages, install_driver=True):
+"""
+install python packages on all executors and driver through pip
+:param packages: string for single package or a list of string for 
multiple packages
+:param install_driver: whether to install packages in client
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise Exception("install_packages can only use called when "
+"spark.pyspark.virtualenv.enabled set as true")
+if isinstance(packages, basestring):
+packages = [packages]
+num_executors = int(self._conf.get("spark.executor.instances"))
+dummyRDD = self.parallelize(range(num_executors), num_executors)
--- End diff --

This is not guaranteed to work, and overlooks the situation of having 
executors added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2016-11-07 Thread vogxn
Github user vogxn commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r86756833
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -69,6 +84,66 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   /**
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p pythonExec --no-site-packages 
virtualenvName
+   *   -  Execute command: python -m pip --cache-dir cache-dir install -r 
requirement_file
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix prefix --file 
requirement_file -y
+   *
+   */
+  def setupVirtualEnv(): Unit = {
+logDebug("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported" )
+virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+// use the absolute path when it is local mode otherwise just use 
filename as it would be
+// fetched from FileServer
+val pyspark_requirements =
+  if (Utils.isLocalMaster(conf)) {
+conf.get("spark.pyspark.virtualenv.requirements")
+  } else {
+conf.get("spark.pyspark.virtualenv.requirements").split("/").last
+  }
+
+val createEnvCommand =
+  if (virtualEnvType == "native") {
+Arrays.asList(virtualEnvPath,
+  "-p", pythonExec,
+  "--no-site-packages", virtualEnvName)
+  } else {
+Arrays.asList(virtualEnvPath,
+  "create", "--prefix", System.getProperty("user.dir") + "/" + 
virtualEnvName,
--- End diff --

Started writing this comment and had to recompile my cluster. I figured I 
had made a mistake in the permissions. Apologise for the false alarm. The patch 
works fine and I'm able to run executors with the conda environment. I'll do 
some more testing from my end.

= Following was my setup =
Apache Spark (with this patch) is compiled with Apache Hadoop 2.6.0. I've 
installed `anaconda2-4.1.1` on all my nodes in the cluster under 
`/usr/lib/anaconda2`. I can create conda environments using the command `conda 
create --prefix test-env numpy -y` fine.

The following shell script is used to submit my pyspark programs:

```
$ cat run.sh
/usr/lib/spark/bin/spark-submit  --master yarn --deploy-mode client \
--conf "spark.pyspark.virtualenv.enabled=true" \
--conf "spark.pyspark.virtualenv.type=conda" \
--conf "spark.pyspark.virtualenv.requirements=/home/tsp/conda.txt" \
--conf "spark.pyspark.virtualenv.bin.path=/usr/lib/anaconda2/bin/conda" 
"$@"
```

This is the program I've submitted to see if the anaconda environment is 
detected in the executors

```
$ cat execinfo.py
from pyspark import SparkContext
import sys

if __name__ == '__main__':
  sc = SparkContext()
  print sys.version
  print sc.parallelize(range(1,2)).map(lambda x: sys.version).collect()
```

This is what is seen in the debug logs
```
Caused by: java.lang.RuntimeException: Fail to run command: 
/usr/lib/anaconda2/bin/conda create --prefix 
/media/ebs2/yarn/local/usercache/tsp/appcache/application_1478497303110_0005/container_1478497303110_0005_01_03/virtualenv_application_1478497303110_0005_3-
-file conda.txt -y
at 
org.apache.spark.api.python.PythonWorkerFactory.execCommand(PythonWorkerFactory.scala:142)
at 
org.apache.spark.api.python.PythonWorkerFactory.setupVirtualEnv(PythonWorkerFactory.scala:124)
at 
org.apache.spark.api.python.PythonWorkerFactory.(PythonWorkerFactory.scala:70)
```

`/media/ebs2/yarn` is owned by `yarn (id): hadoop (gid)`




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2016-11-07 Thread vogxn
Github user vogxn commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r86720667
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -69,6 +84,66 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   /**
+   * Create virtualenv using native virtualenv or conda
+   *
+   * Native Virtualenv:
+   *   -  Execute command: virtualenv -p pythonExec --no-site-packages 
virtualenvName
+   *   -  Execute command: python -m pip --cache-dir cache-dir install -r 
requirement_file
+   *
+   * Conda
+   *   -  Execute command: conda create --prefix prefix --file 
requirement_file -y
+   *
+   */
+  def setupVirtualEnv(): Unit = {
+logDebug("Start to setup virtualenv...")
+logDebug("user.dir=" + System.getProperty("user.dir"))
+logDebug("user.home=" + System.getProperty("user.home"))
+
+require(virtualEnvType == "native" || virtualEnvType == "conda",
+  s"VirtualEnvType: ${virtualEnvType} is not supported" )
+virtualEnvName = "virtualenv_" + conf.getAppId + "_" + 
VIRTUALENV_ID.getAndIncrement()
+// use the absolute path when it is local mode otherwise just use 
filename as it would be
+// fetched from FileServer
+val pyspark_requirements =
+  if (Utils.isLocalMaster(conf)) {
+conf.get("spark.pyspark.virtualenv.requirements")
+  } else {
+conf.get("spark.pyspark.virtualenv.requirements").split("/").last
+  }
+
+val createEnvCommand =
+  if (virtualEnvType == "native") {
+Arrays.asList(virtualEnvPath,
+  "-p", pythonExec,
+  "--no-site-packages", virtualEnvName)
+  } else {
+Arrays.asList(virtualEnvPath,
+  "create", "--prefix", System.getProperty("user.dir") + "/" + 
virtualEnvName,
--- End diff --

I was trying to test this patch with `conda` and ended up with permission 
errors on this directory when the virtualenv gets created. So this location is 
owned by yarn and my spark-submit client does not have permissions to write to 
the directory. Is it possible to use something more standard like java tmpdir 
or may be provide a option to override this location?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >