WeichenXu123 opened a new pull request, #40954:
URL: https://github.com/apache/spark/pull/40954

   ### What changes were proposed in this pull request?
   
   #### Make the pyspark UDF support annotating python dependencies and when 
executing UDF, the UDF worker creates a new python environment with provided 
python dependencies.
   
    - Supported spark mode: spark connect model / legacy mode
    - Supported UDF type: All kinds of pyspark UDFs
       - SQL_BATCHED_UDF
       - SQL_SCALAR_PANDAS_UDF
       - SQL_GROUPED_MAP_PANDAS_UDF
       - SQL_GROUPED_AGG_PANDAS_UDF
       - SQL_WINDOW_AGG_PANDAS_UDF
       - SQL_SCALAR_PANDAS_ITER_UDF
       - SQL_MAP_PANDAS_ITER_UDF
       - SQL_COGROUPED_MAP_PANDAS_UDF
       - SQL_MAP_ARROW_ITER_UDF
       - SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE
   
   
   #### Implementation sketch
    - Before starting the pyspark UDF worker process, the python environment 
with provided python packages is created, and pyspark UDF worker process is 
spawned using the provided python environment instead of default configured 
pyspark python. 
    - Using `virtualenv` to create a python environment based on current python 
environment that pyspark uses, then using `pip install` to install provided 
python packages.
    - If user configures a NFS directory that is accessible by all spark nodes 
(readable/writable to spark driver, readable to all spark workers), then it 
prepares the python environment in driver side, otherwise it creates the python 
environment in spark worker side.
    - The python environment is cached in spark driver or worker side 
(depending on NFS directory enabled or not), we uses SHA1 over sorted pip 
requirements list as python caching key.
   
   #### TODOs
    - In frontend, the PR currently only supports annotating `pip_requirements` 
for `pandas_udf`, but for other types of UDFs, and for `mapInPandas` / 
`mapInArrow` the `pip_requirements` argument haven't been added.
    - Supports annotating python version for pyspark UDF, and in UDF execution, 
downloading python using provided python version and creating python 
environment using provided python version.
    - Using file lock during python environment creation, to avoid race 
conditions.
    - Unit tests
   
   ### Why are the changes needed?
   
    - For spark connect case, the client python environment is very likely to 
be different with pyspark server side python environment, this causes user's 
UDF function execution failure in pyspark server side.
    - Some machine learning third-party library (e.g. MLflow) requires pyspark 
UDF supporting  dependencies, because in ML cases, we need to run model 
inference by pyspark UDF in the exactly the same python environment that trains 
the model. Currently MLflow supports it by creating a child python process in 
pyspark UDF worker, and redirecting all UDF input data to the child python 
process to run model inference, this way it causes significant overhead, if 
pyspark UDF support builtin python dependency management then we don't need 
such poorly performing approach.
   
   
   ### Does this PR introduce _any_ user-facing change?
   ```
   
   @pandas_udf("string", pip_requirements=...)
   
   ```
   
   `pip_requirements` argument means either an iterable of pip requirement 
strings (e.g. ``["scikit-learn", "-r /path/to/req2.txt", "-c 
/path/to/constraints.txt"]``) or the string path to a pip requirements file 
path on the local filesystem (e.g. ``"/path/to/requirements.txt"``) represents 
the pip requirements for the python UDF.
   
   
   ### How was this patch tested?
   
   Unit tests to be added.
   
   Manually tests:
   
   ```
   import pandas as pd
   from pyspark.sql.functions import pandas_udf
   
   sc.setLogLevel("INFO")
   
   @pandas_udf("string", pip_requirements=["PyYAML==6.0"])
   def to_upper(s: pd.Series) -> pd.Series:
       import yaml
       return s.str.upper() + f"yaml-version: {yaml.__version__}"
   
   df = spark.createDataFrame([("John Doe",)], ("name",))
   df.select(to_upper("name")).show(truncate=False)
   ```
   
   Run above code in spark legacy mode or spark connect mode.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to