Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/13599#discussion_r193674619
--- Diff: python/pyspark/context.py ---
@@ -1035,6 +1044,41 @@ def getConf(self):
conf.setAll(self._conf.getAll())
return conf
+ def install_packages(self, packages):
+ """
+ Install python packages on all executors and driver through pip.
pip will be installed
+ by default no matter using native virtualenv or conda. So it is
guaranteed that pip is
+ available if virtualenv is enabled.
+ :param packages: string for single package or a list of string for
multiple packages
+ """
+ if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+ raise RuntimeError("install_packages can only use called when "
+ "spark.pyspark.virtualenv.enabled is set as
true")
+ if isinstance(packages, basestring):
+ packages = [packages]
+ # seems statusTracker.getExecutorInfos() will return driver +
exeuctors, so -1 here.
+ num_executors =
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+ dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+ def _run_pip(packages, iterator):
+ import pip
+ return pip.main(["install"] + packages)
+
+ # install package on driver first. if installation succeeded,
continue the installation
+ # on executors, otherwise return directly.
+ if _run_pip(packages, None) != 0:
+ return
+
+ virtualenvPackages =
self._conf.get("spark.pyspark.virtualenv.packages")
+ if virtualenvPackages:
+ self._conf.set("spark.pyspark.virtualenv.packages",
virtualenvPackages + ":" +
+ ":".join(packages))
+ else:
+ self._conf.set("spark.pyspark.virtualenv.packages",
":".join(packages))
+
+ import functools
--- End diff --
Can we move this up within this function?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]