Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13599#discussion_r163197957
  
    --- Diff: python/pyspark/context.py ---
    @@ -1023,6 +1032,42 @@ def getConf(self):
             conf.setAll(self._conf.getAll())
             return conf
     
    +    def install_packages(self, packages):
    +        """
    +        install python packages on all executors and driver through pip. 
pip will be installed
    +        by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
    +        available if virtualenv is enabled.
    +        :param packages: string for single package or a list of string for 
multiple packages
    +        :param install_driver: whether to install packages in client
    +        """
    +        if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
    +            raise RuntimeError("install_packages can only use called when "
    +                               "spark.pyspark.virtualenv.enabled is set as 
true")
    +        if isinstance(packages, basestring):
    +            packages = [packages]
    +        # seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
    +        num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
    +        dummyRDD = self.parallelize(range(num_executors), num_executors)
    +
    +        def _run_pip(packages, iterator):
    +            import pip
    +            return pip.main(["install"] + packages)
    +
    +        # install package on driver first. if installation succeeded, 
continue the installation
    +        # on executors, otherwise return directly.
    +        if _run_pip(packages, None) != 0:
    +            return
    +
    +        virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
    +        if virtualenvPackages:
    +            self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + "," +
    +                           ",".join(packages))
    +        else:
    +            self._conf.set("spark.pyspark.virtualenv.packages", 
",".join(packages))
    +
    +        import functools
    +        dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
    --- End diff --
    
    I guess this does not guarantee that the `_run_pip` is executed on all 
executors.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to