WeichenXu123 opened a new pull request, #40954:
URL: https://github.com/apache/spark/pull/40954
### What changes were proposed in this pull request?
#### Make the pyspark UDF support annotating python dependencies and when
executing UDF, the UDF worker creates a new python environment with provided
python dependencies.
- Supported spark mode: spark connect model / legacy mode
- Supported UDF type: All kinds of pyspark UDFs
- SQL_BATCHED_UDF
- SQL_SCALAR_PANDAS_UDF
- SQL_GROUPED_MAP_PANDAS_UDF
- SQL_GROUPED_AGG_PANDAS_UDF
- SQL_WINDOW_AGG_PANDAS_UDF
- SQL_SCALAR_PANDAS_ITER_UDF
- SQL_MAP_PANDAS_ITER_UDF
- SQL_COGROUPED_MAP_PANDAS_UDF
- SQL_MAP_ARROW_ITER_UDF
- SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE
#### Implementation sketch
- Before starting the pyspark UDF worker process, the python environment
with provided python packages is created, and pyspark UDF worker process is
spawned using the provided python environment instead of default configured
pyspark python.
- Using `virtualenv` to create a python environment based on current python
environment that pyspark uses, then using `pip install` to install provided
python packages.
- If user configures a NFS directory that is accessible by all spark nodes
(readable/writable to spark driver, readable to all spark workers), then it
prepares the python environment in driver side, otherwise it creates the python
environment in spark worker side.
- The python environment is cached in spark driver or worker side
(depending on NFS directory enabled or not), we uses SHA1 over sorted pip
requirements list as python caching key.
#### TODOs
- In frontend, the PR currently only supports annotating `pip_requirements`
for `pandas_udf`, but for other types of UDFs, and for `mapInPandas` /
`mapInArrow` the `pip_requirements` argument haven't been added.
- Supports annotating python version for pyspark UDF, and in UDF execution,
downloading python using provided python version and creating python
environment using provided python version.
- Using file lock during python environment creation, to avoid race
conditions.
- Unit tests
### Why are the changes needed?
- For spark connect case, the client python environment is very likely to
be different with pyspark server side python environment, this causes user's
UDF function execution failure in pyspark server side.
- Some machine learning third-party library (e.g. MLflow) requires pyspark
UDF supporting dependencies, because in ML cases, we need to run model
inference by pyspark UDF in the exactly the same python environment that trains
the model. Currently MLflow supports it by creating a child python process in
pyspark UDF worker, and redirecting all UDF input data to the child python
process to run model inference, this way it causes significant overhead, if
pyspark UDF support builtin python dependency management then we don't need
such poorly performing approach.
### Does this PR introduce _any_ user-facing change?
```
@pandas_udf("string", pip_requirements=...)
```
`pip_requirements` argument means either an iterable of pip requirement
strings (e.g. ``["scikit-learn", "-r /path/to/req2.txt", "-c
/path/to/constraints.txt"]``) or the string path to a pip requirements file
path on the local filesystem (e.g. ``"/path/to/requirements.txt"``) represents
the pip requirements for the python UDF.
### How was this patch tested?
Unit tests to be added.
Manually tests:
```
import pandas as pd
from pyspark.sql.functions import pandas_udf
sc.setLogLevel("INFO")
@pandas_udf("string", pip_requirements=["PyYAML==6.0"])
def to_upper(s: pd.Series) -> pd.Series:
import yaml
return s.str.upper() + f"yaml-version: {yaml.__version__}"
df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show(truncate=False)
```
Run above code in spark legacy mode or spark connect mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]