[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user mstreuhofer commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r205951170 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user mstreuhofer commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r205950127 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() --- End diff -- the temporary directory is not being deleted on exit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195919967 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195920245 --- Diff: python/pyspark/context.py --- @@ -1035,6 +1044,46 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +@since(2.4) +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. --- End diff -- This will only be the case if in Kubernetes you specify the `spark-py` image. So this will be need to be expanded per cluster-manager. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195919580 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") --- End diff -- In addition how are we handling the case for an existing `s"$virtualEnvBinPath/$virtualEnvName"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195919982 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195919958 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195919848 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor --- End diff -- In Kubernetes world, I might want to use a `requirements.txt` file that is stored locally in the base docker image, regardless of client or cluster mode. Is that something that you think should be supported? Maybe a config variable `spark.pyspark.virtualenv.kubernetes.localrequirements` that points to a file stored as
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195920107 --- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala --- @@ -39,12 +39,17 @@ object PythonRunner { val pyFiles = args(1) val otherArgs = args.slice(2, args.length) val sparkConf = new SparkConf() -val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) +var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) .orElse(sparkConf.get(PYSPARK_PYTHON)) .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) .orElse(sys.env.get("PYSPARK_PYTHON")) .getOrElse("python") +if (sparkConf.getBoolean("spark.pyspark.virtualenv.enabled", false)) { + val virtualEnvFactory = new VirtualEnvFactory(pythonExec, sparkConf, true) + pythonExec = virtualEnvFactory.setupVirtualEnv() --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195920034 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195919641 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode --- End diff -- In the case of Kubernetes: This will be created in the base spark-py Docker image, which is shared between the driver and executors and the containers will be cleaned up upon termination of the job via owner-labels (for the executor) and the k8s API-Server (for the driver). As such, (hopefully with client-mode support being completed soon), the below logic should hold as well. Is this work going to be cluster-manage agnostic? Or is this supposed to only support Yarn? I would like to see this be applicable to all first-class cluster-management systems. I can help with appending to this PR: k8s Support and the appropriate integration tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195920098 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { --- End diff -- Might be better to pass args into this function so that is could be properly unit-tested. It seems that there are no unit-tests for this class, so that seems to be a necessary addition. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r195920112 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// for pyspark virtualenv +if (args.isPython) { + if (clusterManager != YARN && +args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", "false") == "true") { +printErrorAndExit("virtualenv is only supported in yarn mode") --- End diff -- +1 for Kubernetes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193674158 --- Diff: python/pyspark/context.py --- @@ -1035,6 +1044,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages --- End diff -- Shall we add: ``` .. versionadded:: 2.3.0 .. note:: Experimental ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193673500 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark (This is an experimental feature and may evolve in future version) +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +first to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). User can use this feature +in 2 scenarios: +* Batch mode (submit spark app via spark-submit) +* Interactive mode (PySpark shell or other third party Spark Notebook) + --- End diff -- Ah, maybe we can leave a note at the end instead of adding it in the title. ``` Note that this is an experimental feature added from Spark 2.4.0 and may evolve in the future version. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193674619 --- Diff: python/pyspark/context.py --- @@ -1035,6 +1044,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return + +virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") +if virtualenvPackages: +self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + ":" + + ":".join(packages)) +else: +self._conf.set("spark.pyspark.virtualenv.packages", ":".join(packages)) + +import functools --- End diff -- Can we move this up within this function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193672797 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark (This is an experimental feature and may evolve in future version) +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +first to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). User can use this feature +in 2 scenarios: +* Batch mode (submit spark app via spark-submit) +* Interactive mode (PySpark shell or other third party Spark Notebook) + +## Prerequisites +- Each node have virtualenv/conda, python-devel installed +- Each node is internet accessible (for downloading packages) + +## Batch Mode + +In batch mode, user need to specify the additional python packages before launching spark app. There're 2 approaches to specify that: +* Provide a requirement file which contains all the packages for the virtualenv. +* Specify packages via spark configuration `spark.pyspark.virtualenv.packages`. + +Here're several examples: + +{% highlight bash %} +### Setup virtualenv using native virtualenv on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=native" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +### Setup virtualenv using conda on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +### Setup virtualenv using conda on yarn-client mode and specify packages via `spark.pyspark.virtualenv.packages` +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.packages=numpy,pandas" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + +{% endhighlight %} + +### How to create requirement file ? +Usually before running distributed PySpark job, you need first to run it in local environment. It is encouraged to first create your own virtualenv for your project, so you know what packages you need. After you are confident with your work and want to move it to cluster, you can run the following command to generate the requirement file for virtualenv and conda. +- pip freeze > requirements.txt +- conda list --export > requirements.txt + +## Interactive Mode +In interactive mode,user can install python packages at runtime instead of specifying them in requirement file when submitting spark app. +Here are several ways to install packages + +{% highlight python %} +sc.install_packages("numpy") # install the latest numpy --- End diff -- Seems there are tabs here. Shall we replace them to spaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193664416 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark --- End diff -- Thanks @HyukjinKwon , doc is updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193659778 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark --- End diff -- @zjffdu, mind if I ask to describe this is an experimental feature and it's very likely to be unstable and it's still evolving? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r175670974 --- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala --- @@ -39,12 +39,17 @@ object PythonRunner { val pyFiles = args(1) val otherArgs = args.slice(2, args.length) val sparkConf = new SparkConf() -val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) +var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) .orElse(sparkConf.get(PYSPARK_PYTHON)) .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) .orElse(sys.env.get("PYSPARK_PYTHON")) .getOrElse("python") +if (sparkConf.getBoolean("spark.pyspark.virtualenv.enabled", false)) { + val virtualEnvFactory = new VirtualEnvFactory(pythonExec, sparkConf, true) + pythonExec = virtualEnvFactory.setupVirtualEnv() --- End diff -- correct me if I misunderstood. Couldn't we have some tests to check if `setupVirtualEnv` has a proper string at least? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164646157 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { --- End diff -- It is used by launcher module which doesn't depend on scala. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164069473 --- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala --- @@ -39,12 +39,17 @@ object PythonRunner { val pyFiles = args(1) val otherArgs = args.slice(2, args.length) val sparkConf = new SparkConf() -val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) +var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) --- End diff -- I am afraid I have to use var, because `VirtualenvFactory` also need `pythonExec` as constructor arg. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164068172 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") --- End diff -- For the existing running executor, the only way to install additional packages is via `sc.intall_packages`. And `spark.pyspark.virtualenv.packages` will be updated on driver side first when `sc.install_packges` is called, then new allocated executor will fetch this property and install all the additional packages properly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164037516 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return + +virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") +if virtualenvPackages: +self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + "," + + ",".join(packages)) +else: +self._conf.set("spark.pyspark.virtualenv.packages", ",".join(packages)) --- End diff -- Good catch, I will use other separator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164037239 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) + class DriverEndpoint(override val rpcEnv: RpcEnv) --- End diff -- Without this change, the following scenario won't work. 1. Launch spark app 2. call `sc.install_packages("numpy")` 3. run `sc.range(3).map(lambda x: np.__version__).collect()` 4. Restart executor (by kill it, scheduler will scheduler another executor) 5. run `sc.range(3).map(lambda x: np.__version___.collect()` again, this time it would fail. Because the new scheduled executor can not set up virtualenv correctly as it can not get the updated `spark.pyspark.virtualenv.packages`. That's why make this change in core. Now executor would always get the updated SparkConf instead of the SparkConf created when spark app is started. There's some overhead, but I believe it is very trivial, and could be improved later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164037055 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +300,34 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), conf.get(SparkLauncher.PYSPARK_PYTHON), System.getenv("PYSPARK_DRIVER_PYTHON"), System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + "python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = Class.forName("org.apache.spark.api.python.VirtualEnvFactory"); --- End diff -- Because launcher module does not depend on core module explicitly in pom.xml --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164036488 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand =
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164034980 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand =
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164034871 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return --- End diff -- The pip install output will be displayed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164032549 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +300,34 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), conf.get(SparkLauncher.PYSPARK_PYTHON), System.getenv("PYSPARK_DRIVER_PYTHON"), System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + "python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = Class.forName("org.apache.spark.api.python.VirtualEnvFactory"); --- End diff -- Why are we using reflection here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164030979 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") --- End diff -- So if the factory is made once then how will these get updated? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164031483 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand =
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164031905 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,42 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return + +virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") +if virtualenvPackages: +self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + "," + + ",".join(packages)) +else: +self._conf.set("spark.pyspark.virtualenv.packages", ",".join(packages)) + +import functools +dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) --- End diff -- When have you seen this fail? Just trying to follow along. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164031985 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return --- End diff -- So wait will this fail silently or how will the user know its failed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164032400 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return + +virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") +if virtualenvPackages: +self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + "," + + ",".join(packages)) +else: +self._conf.set("spark.pyspark.virtualenv.packages", ",".join(packages)) --- End diff -- So here and other places you split on "," which isn't great for people who want to specify version ranges. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164032982 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) + class DriverEndpoint(override val rpcEnv: RpcEnv) --- End diff -- My question here is why is this change needed? Changing the scheduler backend is weird for this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164031719 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand =
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r164032919 --- Diff: core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala --- @@ -39,12 +39,17 @@ object PythonRunner { val pyFiles = args(1) val otherArgs = args.slice(2, args.length) val sparkConf = new SparkConf() -val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) +var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) --- End diff -- Why is this made into a var? You could keep this as a val with a bit of refactoring. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163427975 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,42 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return + +virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") +if virtualenvPackages: +self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + "," + + ",".join(packages)) +else: +self._conf.set("spark.pyspark.virtualenv.packages", ",".join(packages)) + +import functools +dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) --- End diff -- You are right, it is not guaranteed. From my experiment, it works pretty well most of time. And even it is not executed on all executors in this rdd operation, packages will be installed when later python daemon is started on that executor. So in any case, python packages will be installed in all python daemons. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163206632 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { --- End diff -- I guess we can use `boolean.class` in java reflection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163203430 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenv_basedir = Files.createTempDir() --- End diff -- nit: `virtualenvBasedir`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163202935 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ --- End diff -- Do we need to have these as an instance variables? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163203798 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { +/* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) +require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") +// Two scenarios of creating virtualenv: +// 1. created in yarn container. Yarn will clean it up after container is exited +// 2. created outside yarn container. Spark need to create temp directory and clean it after app +//finish. +// - driver of PySpark shell +// - driver of yarn-client mode +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenv_basedir = Files.createTempDir() + virtualenv_basedir.deleteOnExit() + virtualEnvName = virtualenv_basedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pysparkRequirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163201794 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") --- End diff -- Use `val`s for these three variables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163198358 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,42 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +install python packages on all executors and driver through pip. pip will be installed --- End diff -- nit: `Install` instead of `install` at the beginning of the line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163199689 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +300,38 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), conf.get(SparkLauncher.PYSPARK_PYTHON), System.getenv("PYSPARK_DRIVER_PYTHON"), System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + "python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); +Object virtualEnv = virtualEnvClazz.getConstructor(String.class, Map.class, Boolean.class) + .newInstance(pythonExec, conf, true); +Method virtualEnvMethod = virtualEnvClazz.getMethod("setupVirtualEnv"); +pythonExec = (String) virtualEnvMethod.invoke(virtualEnv); +pyargs.add(pythonExec); + } catch (Exception e) { +throw new IOException(e); + } +} else { + pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), +System.getenv("PYSPARK_DRIVER_PYTHON"), +System.getenv("PYSPARK_PYTHON"), +"python")); --- End diff -- We can simplify as `pyargs.add(pythonExec);`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163199169 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,42 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client --- End diff -- What's `install_driver` parameter? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163199771 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +300,38 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), conf.get(SparkLauncher.PYSPARK_PYTHON), System.getenv("PYSPARK_DRIVER_PYTHON"), System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + "python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); --- End diff -- `Class.forName` instead of `getClass().forName`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163194199 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -82,6 +90,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) + + if (virtualEnvEnabled) { +val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false) +virtualenvPythonExec = Some(virtualEnvFactory.setupVirtualEnv()) + } --- End diff -- I guess we don't prefer unnecessary `var`s. How about the following with the diff above: ```scala val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false) val virtualenvPythonExec = if (virtualEnvEnabled) { val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false) Some(virtualEnvFactory.setupVirtualEnv()) } else { None } ``` Or maybe we can: ```scala val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false) val virtualenvPythonExec = if (virtualEnvEnabled) { val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false) virtualEnvFactory.setupVirtualEnv() } else { pythonExec } ``` and use this directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r163197957 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,42 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return + +virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") +if virtualenvPackages: +self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + "," + + ",".join(packages)) +else: +self._conf.set("spark.pyspark.virtualenv.packages", ",".join(packages)) + +import functools +dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) --- End diff -- I guess this does not guarantee that the `_run_pip` is executed on all executors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r162856254 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,35 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +pip.main(["install"] + packages) + +# run it in the main thread. Will do it in a separated thread after +# https://github.com/pypa/pip/issues/2553 is fixed +if install_driver: +_run_pip(packages, None) + +import functools +dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) --- End diff -- @zjffdu No its it not, hard -1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160572606 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,35 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +pip.main(["install"] + packages) + +# run it in the main thread. Will do it in a separated thread after +# https://github.com/pypa/pip/issues/2553 is fixed +if install_driver: +_run_pip(packages, None) + +import functools +dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) --- End diff -- It make sense to making this feature as experimental. Because although it is not reliable in some cases, it is still pretty useful in interactive mode, e.g. In notebook, it is not possible to set down all the dependent packages before launching spark app. Installing packages at runtime is very useful for interactive mode. And since usually notebook is a experimental phase, not production phase. These corner cases should be acceptable IMHO as long as we document them and make users aware. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user Stibbons commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160549913 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,35 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +pip.main(["install"] + packages) + +# run it in the main thread. Will do it in a separated thread after +# https://github.com/pypa/pip/issues/2553 is fixed +if install_driver: +_run_pip(packages, None) + +import functools +dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) --- End diff -- what about making this feature experimental and so improving it gradually ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160521328 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1032,35 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +pip.main(["install"] + packages) + +# run it in the main thread. Will do it in a separated thread after +# https://github.com/pypa/pip/issues/2553 is fixed +if install_driver: +_run_pip(packages, None) + +import functools +dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) --- End diff -- This approach is not reliable to executor failur/restart, dynamic allocation, and other possible changes. I'm not comfortable merging something which depends on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160310618 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1039,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client --- End diff -- @HyukjinKwon Could you guide me how to skip such doctests ? Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160308321 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +301,39 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), - conf.get(SparkLauncher.PYSPARK_PYTHON), - System.getenv("PYSPARK_DRIVER_PYTHON"), - System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); -} -if (!isEmpty(pyOpts)) { - pyargs.addAll(parseOptionString(pyOpts)); -} +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), +System.getenv("PYSPARK_DRIVER_PYTHON"), +System.getenv("PYSPARK_PYTHON"), +"python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); +Object virtualEnv = virtualEnvClazz.getConstructor(String.class, Map.class, Boolean.class) +.newInstance(pythonExec, conf, true); +Method virtualEnvMethod = virtualEnv.getClass().getMethod("setupVirtualEnv"); +pythonExec = (String) virtualEnvMethod.invoke(virtualEnv); +pyargs.add(pythonExec); + } catch (Exception e) { +throw new IOException(e); + } +} else { + pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), + conf.get(SparkLauncher.PYSPARK_PYTHON), + System.getenv("PYSPARK_DRIVER_PYTHON"), + System.getenv("PYSPARK_PYTHON"), + "python")); + String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); + if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { +// pass conf spark.pyspark.python to python by environment variable. +env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + } + if (!isEmpty(pyOpts)) { +pyargs.addAll(parseOptionString(pyOpts)); + } --- End diff -- Good point, fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160285377 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) --- End diff -- I am afraid not. Because this method is used by module launcher where SparkConf is unknown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160141363 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1039,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): --- End diff -- user can call this method to install package at runtime. e.g. user can install package in pyspark shell. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160140451 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,73 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for Pyspark +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). + +# Prerequisites +- Each node have virtualenv/conda, python-devel installed +- Each node is internet accessible (for downloading packages) + +{% highlight bash %} +# Setup virtualenv using native virtualenv on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=native" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +# Setup virtualenv using conda on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.requirements=<" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + +{% endhighlight %} + +## PySpark VirtualEnv Configurations + +Property NameDefaultMeaning + + spark.pyspark.virtualenv.enabled + false + Whether to enable virtualenv + + + Spark.pyspark.virtualenv.type + virtualenv --- End diff -- I don't have strong preference on `native`. The reason I use native is that I already use virtualenv in the all the virtualenv related configuration, just don't want to introduce virtualenv here again to avoid confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160139161 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), + s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't exist.") +// Use a temp directory for virtualenv in the following cases: +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise create the virtualenv folder under the executor working directory. +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenv_basedir = Files.createTempDir() + virtualenv_basedir.deleteOnExit() + virtualEnvName = virtualenv_basedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pyspark_requirements = --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160138782 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { --- End diff -- This is because it will also be called in java side via java reflection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160138391 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -60,6 +66,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) + + if (virtualEnvEnabled) { +val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false) +pythonExec = virtualEnvFactory.setupVirtualEnv() --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160138349 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -29,7 +30,10 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.util.{RedirectThread, Utils} -private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) + +private[spark] class PythonWorkerFactory(var pythonExec: String, --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160085499 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,73 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for Pyspark +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). + +# Prerequisites +- Each node have virtualenv/conda, python-devel installed +- Each node is internet accessible (for downloading packages) + +{% highlight bash %} +# Setup virtualenv using native virtualenv on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=native" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +# Setup virtualenv using conda on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.requirements=<" \ --- End diff -- nit: remove an extra `<`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160083009 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), + s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't exist.") +// Use a temp directory for virtualenv in the following cases: +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise create the virtualenv folder under the executor working directory. +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenv_basedir = Files.createTempDir() --- End diff -- nit: `virtualenvBasedir`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160081109 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) --- End diff -- How about `this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)` and removing the following 3 lines so we can make `virtualEnvType` and `virtualEnvPath` val? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160088202 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +301,39 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), - conf.get(SparkLauncher.PYSPARK_PYTHON), - System.getenv("PYSPARK_DRIVER_PYTHON"), - System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); -} -if (!isEmpty(pyOpts)) { - pyargs.addAll(parseOptionString(pyOpts)); -} +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), +System.getenv("PYSPARK_DRIVER_PYTHON"), +System.getenv("PYSPARK_PYTHON"), +"python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); +Object virtualEnv = virtualEnvClazz.getConstructor(String.class, Map.class, Boolean.class) +.newInstance(pythonExec, conf, true); +Method virtualEnvMethod = virtualEnv.getClass().getMethod("setupVirtualEnv"); +pythonExec = (String) virtualEnvMethod.invoke(virtualEnv); +pyargs.add(pythonExec); + } catch (Exception e) { +throw new IOException(e); + } +} else { + pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), + conf.get(SparkLauncher.PYSPARK_PYTHON), + System.getenv("PYSPARK_DRIVER_PYTHON"), + System.getenv("PYSPARK_PYTHON"), + "python")); --- End diff -- We can simplify as `pyargs.add(pythonExec);`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160093249 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +301,39 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), - conf.get(SparkLauncher.PYSPARK_PYTHON), - System.getenv("PYSPARK_DRIVER_PYTHON"), - System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); -} -if (!isEmpty(pyOpts)) { - pyargs.addAll(parseOptionString(pyOpts)); -} +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), +System.getenv("PYSPARK_DRIVER_PYTHON"), +System.getenv("PYSPARK_PYTHON"), +"python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); --- End diff -- nit: `Class.forName(...);`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160087899 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +301,39 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), - conf.get(SparkLauncher.PYSPARK_PYTHON), - System.getenv("PYSPARK_DRIVER_PYTHON"), - System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); -} -if (!isEmpty(pyOpts)) { - pyargs.addAll(parseOptionString(pyOpts)); -} +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160091411 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,73 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for Pyspark +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). + +# Prerequisites +- Each node have virtualenv/conda, python-devel installed +- Each node is internet accessible (for downloading packages) + +{% highlight bash %} +# Setup virtualenv using native virtualenv on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=native" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +# Setup virtualenv using conda on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.requirements=<" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + +{% endhighlight %} + +## PySpark VirtualEnv Configurations + +Property NameDefaultMeaning + + spark.pyspark.virtualenv.enabled + false + Whether to enable virtualenv + + + Spark.pyspark.virtualenv.type + virtualenv --- End diff -- `native` instead of `virtualenv`? Btw, should we use `native` for the config value to indicate virtualenv? I'd prefer `virtualenv` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160090598 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), + s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't exist.") +// Use a temp directory for virtualenv in the following cases: +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise create the virtualenv folder under the executor working directory. +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenv_basedir = Files.createTempDir() + virtualenv_basedir.deleteOnExit() + virtualEnvName = virtualenv_basedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pyspark_requirements = + if (isLauncher || +(isDriver && conf.get("spark.submit.deployMode") == "client")) { +conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + +val createEnvCommand = + if (virtualEnvType == "native") {
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160079462 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { --- End diff -- Do we need to use `java.lang.Boolean` instead of `Boolean`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160083172 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), + s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't exist.") +// Use a temp directory for virtualenv in the following cases: +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise create the virtualenv folder under the executor working directory. +if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenv_basedir = Files.createTempDir() + virtualenv_basedir.deleteOnExit() + virtualEnvName = virtualenv_basedir.getAbsolutePath +} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" +} else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +} + +// Use the absolute path of requirement file in the following cases +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise just use filename as it would be downloaded to the working directory of Executor +val pyspark_requirements = --- End diff -- nit: `pysparkRequirements`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160081357 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), + s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't exist.") --- End diff -- nit: `$virtualEnvPath` instead of `${virtualEnvPath}`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160081322 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) --- End diff -- nit: `$virtualEnvType` instead of `${virtualEnvType}`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160088373 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1039,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): --- End diff -- Who calls this method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160087986 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +301,39 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), - conf.get(SparkLauncher.PYSPARK_PYTHON), - System.getenv("PYSPARK_DRIVER_PYTHON"), - System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); -} -if (!isEmpty(pyOpts)) { - pyargs.addAll(parseOptionString(pyOpts)); -} +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), +System.getenv("PYSPARK_DRIVER_PYTHON"), +System.getenv("PYSPARK_PYTHON"), +"python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); +Object virtualEnv = virtualEnvClazz.getConstructor(String.class, Map.class, Boolean.class) +.newInstance(pythonExec, conf, true); --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160088297 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +301,39 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), - conf.get(SparkLauncher.PYSPARK_PYTHON), - System.getenv("PYSPARK_DRIVER_PYTHON"), - System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); -} -if (!isEmpty(pyOpts)) { - pyargs.addAll(parseOptionString(pyOpts)); -} +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), +System.getenv("PYSPARK_DRIVER_PYTHON"), +System.getenv("PYSPARK_PYTHON"), +"python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); +Object virtualEnv = virtualEnvClazz.getConstructor(String.class, Map.class, Boolean.class) +.newInstance(pythonExec, conf, true); +Method virtualEnvMethod = virtualEnv.getClass().getMethod("setupVirtualEnv"); +pythonExec = (String) virtualEnvMethod.invoke(virtualEnv); +pyargs.add(pythonExec); + } catch (Exception e) { +throw new IOException(e); + } +} else { + pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), + conf.get(SparkLauncher.PYSPARK_PYTHON), + System.getenv("PYSPARK_DRIVER_PYTHON"), + System.getenv("PYSPARK_PYTHON"), + "python")); + String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); + if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { +// pass conf spark.pyspark.python to python by environment variable. +env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + } + if (!isEmpty(pyOpts)) { +pyargs.addAll(parseOptionString(pyOpts)); + } --- End diff -- Don't we need `pyOpts` when virtualenv is enabled? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160093389 --- Diff: launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java --- @@ -299,20 +301,39 @@ // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); -pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), - conf.get(SparkLauncher.PYSPARK_PYTHON), - System.getenv("PYSPARK_DRIVER_PYTHON"), - System.getenv("PYSPARK_PYTHON"), - "python")); -String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); -if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); -} -if (!isEmpty(pyOpts)) { - pyargs.addAll(parseOptionString(pyOpts)); -} +String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), +conf.get(SparkLauncher.PYSPARK_PYTHON), +System.getenv("PYSPARK_DRIVER_PYTHON"), +System.getenv("PYSPARK_PYTHON"), +"python"); +if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { +// setup virtualenv in launcher when virtualenv is enabled in pyspark shell +Class virtualEnvClazz = getClass().forName("org.apache.spark.api.python.VirtualEnvFactory"); +Object virtualEnv = virtualEnvClazz.getConstructor(String.class, Map.class, Boolean.class) +.newInstance(pythonExec, conf, true); +Method virtualEnvMethod = virtualEnv.getClass().getMethod("setupVirtualEnv"); --- End diff -- nit: `virtualEnvClazz.getMethod(...)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160085587 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,73 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for Pyspark +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). + +# Prerequisites +- Each node have virtualenv/conda, python-devel installed +- Each node is internet accessible (for downloading packages) + +{% highlight bash %} +# Setup virtualenv using native virtualenv on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=native" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +# Setup virtualenv using conda on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.requirements=<" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + +{% endhighlight %} + +## PySpark VirtualEnv Configurations + +Property NameDefaultMeaning + + spark.pyspark.virtualenv.enabled + false + Whether to enable virtualenv + + + Spark.pyspark.virtualenv.type --- End diff -- nit: `spark.pyspark...` instead of `Spark.pyspark...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160072831 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1039,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise Exception("install_packages can only use called when " +"spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +num_executors = int(self._conf.get("spark.executor.instances")) +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +pip.main(["install"] + packages) + +# run it in the main thread. Will do it in a separated thread after +# https://github.com/pypa/pip/issues/2553 is fixed +if install_driver: +import threading --- End diff -- Sorry, maybe I missed something. Do we need to import this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160072723 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1039,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client --- End diff -- Can we have examples as doctests and skip to show how we use it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160072647 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { --- End diff -- Hey @zjffdu, couldn't we actually test this by checking if the generated commands are as expected in strings? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160072684 --- Diff: python/pyspark/context.py --- @@ -1023,6 +1039,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise Exception("install_packages can only use called when " --- End diff -- How about `Exception` -> `RuntimeError`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160071888 --- Diff: python/pyspark/context.py --- @@ -980,6 +996,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise Exception("install_packages can only use called when " +"spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +num_executors = int(self._conf.get("spark.executor.instances")) +dummyRDD = self.parallelize(range(num_executors), num_executors) --- End diff -- Right, even without dynamic execution this depend on us contiuing to do uniform distribution of data with parallelize which I don't is gauranteed (and we have no test which would catch this breaking). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160071773 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// for pyspark virtualenv +if (args.isPython) { + if (clusterManager != YARN && +args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", "false") == "true") { +printErrorAndExit("virtualenv is only supported in yarn mode") --- End diff -- Ah please make a JIRA to track this then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160070613 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), --- End diff -- Sorry, the variable name is a little misleading, it should be `virtualEnvBinPath` which is `spark.pyspark.virtualenv.bin.path` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160070518 --- Diff: python/pyspark/context.py --- @@ -980,6 +996,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise Exception("install_packages can only use called when " +"spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +num_executors = int(self._conf.get("spark.executor.instances")) +dummyRDD = self.parallelize(range(num_executors), num_executors) --- End diff -- Right, it would not work when dynamic allocation is enabled. will add condition for that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160070457 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// for pyspark virtualenv +if (args.isPython) { + if (clusterManager != YARN && +args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", "false") == "true") { +printErrorAndExit("virtualenv is only supported in yarn mode") --- End diff -- I haven't tested it in standalone mode, so not guaranteed for that, it is on my plan to support it for standalone later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069581 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") --- End diff -- This `virtualEnvPath` variable is confusing since you append a dynamically generated name to it for actual path determination. (If I read this code correctly) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069346 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda --- End diff -- I wouldn't document the commands called in the function doc string. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069731 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), --- End diff -- I'm confused why were testing this path exists instead of trying to create it if it doesn't already. It also appears to be ignored if in client mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069473 --- Diff: python/pyspark/context.py --- @@ -189,6 +190,21 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._jsc.sc().register(self._javaAccumulator) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') +if self._conf.get("spark.pyspark.virtualenv.enabled") == "true": +self.pythonExec = self._conf.get("spark.pyspark.python", self.pythonExec) +requirements = self._conf.get("spark.pyspark.virtualenv.requirements") +virtualEnvBinPath = self._conf.get("spark.pyspark.virtualenv.bin.path") +virtualEnvType = self._conf.get("spark.pyspark.virtualenv.type", "native") +python_version = self._conf.get("spark.pyspark.virtualenv.python_version") + +if virtualEnvType == "conda" and (requirements is None) and python_version is None: --- End diff -- Could we just assume its the same python version as the version which is running context.py? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069334 --- Diff: core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +private[spark] class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private var virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private var virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { +this(pythonExec, new SparkConf(), isDriver) +properties.asScala.foreach(entry => this.conf.set(entry._1, entry._2)) +virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") +virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path") +this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + def setupVirtualEnv(): String = { +logInfo("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported." ) +require(new File(virtualEnvPath).exists(), + s"VirtualEnvPath: ${virtualEnvPath} is not defined or doesn't exist.") +// Use a temp directory for virtualenv in the following cases: +// 1. driver of pyspark shell +// 2. driver of yarn-client mode +// otherwise create the virtualenv folder under the executor working directory. --- End diff -- Maybe we can clarify why we do the logic this way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069409 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -475,6 +475,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// for pyspark virtualenv +if (args.isPython) { + if (clusterManager != YARN && +args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", "false") == "true") { +printErrorAndExit("virtualenv is only supported in yarn mode") --- End diff -- Why doesn't this work in standalone? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069131 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -29,7 +30,10 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.util.{RedirectThread, Utils} -private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) + +private[spark] class PythonWorkerFactory(var pythonExec: String, --- End diff -- Making pythonExec a var is maybe not ideal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r160069167 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -60,6 +66,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) + + if (virtualEnvEnabled) { +val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false) +pythonExec = virtualEnvFactory.setupVirtualEnv() --- End diff -- Rather than updating a var what about if we created a new val which is set to the value passed in if virtualEnvEnabled is false or this new value if its true? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r122558038 --- Diff: python/pyspark/context.py --- @@ -980,6 +996,33 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages, install_driver=True): +""" +install python packages on all executors and driver through pip +:param packages: string for single package or a list of string for multiple packages +:param install_driver: whether to install packages in client +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise Exception("install_packages can only use called when " +"spark.pyspark.virtualenv.enabled set as true") +if isinstance(packages, basestring): +packages = [packages] +num_executors = int(self._conf.get("spark.executor.instances")) +dummyRDD = self.parallelize(range(num_executors), num_executors) --- End diff -- This is not guaranteed to work, and overlooks the situation of having executors added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user vogxn commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r86756833 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -69,6 +84,66 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } /** + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p pythonExec --no-site-packages virtualenvName + * - Execute command: python -m pip --cache-dir cache-dir install -r requirement_file + * + * Conda + * - Execute command: conda create --prefix prefix --file requirement_file -y + * + */ + def setupVirtualEnv(): Unit = { +logDebug("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported" ) +virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +// use the absolute path when it is local mode otherwise just use filename as it would be +// fetched from FileServer +val pyspark_requirements = + if (Utils.isLocalMaster(conf)) { +conf.get("spark.pyspark.virtualenv.requirements") + } else { +conf.get("spark.pyspark.virtualenv.requirements").split("/").last + } + +val createEnvCommand = + if (virtualEnvType == "native") { +Arrays.asList(virtualEnvPath, + "-p", pythonExec, + "--no-site-packages", virtualEnvName) + } else { +Arrays.asList(virtualEnvPath, + "create", "--prefix", System.getProperty("user.dir") + "/" + virtualEnvName, --- End diff -- Started writing this comment and had to recompile my cluster. I figured I had made a mistake in the permissions. Apologise for the false alarm. The patch works fine and I'm able to run executors with the conda environment. I'll do some more testing from my end. = Following was my setup = Apache Spark (with this patch) is compiled with Apache Hadoop 2.6.0. I've installed `anaconda2-4.1.1` on all my nodes in the cluster under `/usr/lib/anaconda2`. I can create conda environments using the command `conda create --prefix test-env numpy -y` fine. The following shell script is used to submit my pyspark programs: ``` $ cat run.sh /usr/lib/spark/bin/spark-submit --master yarn --deploy-mode client \ --conf "spark.pyspark.virtualenv.enabled=true" \ --conf "spark.pyspark.virtualenv.type=conda" \ --conf "spark.pyspark.virtualenv.requirements=/home/tsp/conda.txt" \ --conf "spark.pyspark.virtualenv.bin.path=/usr/lib/anaconda2/bin/conda" "$@" ``` This is the program I've submitted to see if the anaconda environment is detected in the executors ``` $ cat execinfo.py from pyspark import SparkContext import sys if __name__ == '__main__': sc = SparkContext() print sys.version print sc.parallelize(range(1,2)).map(lambda x: sys.version).collect() ``` This is what is seen in the debug logs ``` Caused by: java.lang.RuntimeException: Fail to run command: /usr/lib/anaconda2/bin/conda create --prefix /media/ebs2/yarn/local/usercache/tsp/appcache/application_1478497303110_0005/container_1478497303110_0005_01_03/virtualenv_application_1478497303110_0005_3- -file conda.txt -y at org.apache.spark.api.python.PythonWorkerFactory.execCommand(PythonWorkerFactory.scala:142) at org.apache.spark.api.python.PythonWorkerFactory.setupVirtualEnv(PythonWorkerFactory.scala:124) at org.apache.spark.api.python.PythonWorkerFactory.(PythonWorkerFactory.scala:70) ``` `/media/ebs2/yarn` is owned by `yarn (id): hadoop (gid)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user vogxn commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r86720667 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -69,6 +84,66 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } /** + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p pythonExec --no-site-packages virtualenvName + * - Execute command: python -m pip --cache-dir cache-dir install -r requirement_file + * + * Conda + * - Execute command: conda create --prefix prefix --file requirement_file -y + * + */ + def setupVirtualEnv(): Unit = { +logDebug("Start to setup virtualenv...") +logDebug("user.dir=" + System.getProperty("user.dir")) +logDebug("user.home=" + System.getProperty("user.home")) + +require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: ${virtualEnvType} is not supported" ) +virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() +// use the absolute path when it is local mode otherwise just use filename as it would be +// fetched from FileServer +val pyspark_requirements = + if (Utils.isLocalMaster(conf)) { +conf.get("spark.pyspark.virtualenv.requirements") + } else { +conf.get("spark.pyspark.virtualenv.requirements").split("/").last + } + +val createEnvCommand = + if (virtualEnvType == "native") { +Arrays.asList(virtualEnvPath, + "-p", pythonExec, + "--no-site-packages", virtualEnvName) + } else { +Arrays.asList(virtualEnvPath, + "create", "--prefix", System.getProperty("user.dir") + "/" + virtualEnvName, --- End diff -- I was trying to test this patch with `conda` and ended up with permission errors on this directory when the virtualenv gets created. So this location is owned by yarn and my spark-submit client does not have permissions to write to the directory. Is it possible to use something more standard like java tmpdir or may be provide a option to override this location? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org