Github user zjffdu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13599#discussion_r163427975
  
    --- Diff: python/pyspark/context.py ---
    @@ -1023,6 +1032,42 @@ def getConf(self):
             conf.setAll(self._conf.getAll())
             return conf
     
    +    def install_packages(self, packages):
    +        """
    +        install python packages on all executors and driver through pip. 
pip will be installed
    +        by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
    +        available if virtualenv is enabled.
    +        :param packages: string for single package or a list of string for 
multiple packages
    +        :param install_driver: whether to install packages in client
    +        """
    +        if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
    +            raise RuntimeError("install_packages can only use called when "
    +                               "spark.pyspark.virtualenv.enabled is set as 
true")
    +        if isinstance(packages, basestring):
    +            packages = [packages]
    +        # seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
    +        num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
    +        dummyRDD = self.parallelize(range(num_executors), num_executors)
    +
    +        def _run_pip(packages, iterator):
    +            import pip
    +            return pip.main(["install"] + packages)
    +
    +        # install package on driver first. if installation succeeded, 
continue the installation
    +        # on executors, otherwise return directly.
    +        if _run_pip(packages, None) != 0:
    +            return
    +
    +        virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
    +        if virtualenvPackages:
    +            self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + "," +
    +                           ",".join(packages))
    +        else:
    +            self._conf.set("spark.pyspark.virtualenv.packages", 
",".join(packages))
    +
    +        import functools
    +        dummyRDD.foreachPartition(functools.partial(_run_pip, packages))
    --- End diff --
    
    You are right, it is not guaranteed. From my experiment, it works pretty 
well most of time. And even it is not executed on all executors in this rdd 
operation, packages will be installed when later python daemon is started on 
that executor. So in any case, python packages will be installed in all python 
daemons.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to