Yicong-Huang opened a new pull request, #55613:
URL: https://github.com/apache/spark/pull/55613
### What changes were proposed in this pull request?
Refactor `SQL_SCALAR_PANDAS_UDF` to use `ArrowStreamSerializer` as pure I/O,
moving Arrow-to-Pandas and Pandas-to-Arrow conversion logic from
`ArrowStreamPandasUDFSerializer` into `read_udfs()` in `worker.py`.
Specifically:
- Remove the dedicated `wrap_scalar_pandas_udf` wrapper.
- Route `SQL_SCALAR_PANDAS_UDF` through
`ArrowStreamSerializer(write_start_stream=True)`.
- In `read_udfs()`, add a self-contained handler that:
- Converts each Arrow `RecordBatch` to pandas Series via
`ArrowBatchTransformer.to_pandas()` (with `struct_in_pandas=\"dict\"`,
`df_for_struct=True`, `ndarray_as_list=False`).
- Invokes each UDF column-wise on the pandas inputs and validates the
return type (must be array-like) and row count (must match input).
- Enforces the existing rule that struct return types must be
`pandas.DataFrame`.
- Converts results back to an Arrow `RecordBatch` via
`PandasToArrowConversion.convert()`.
### Why are the changes needed?
Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388).
This consolidates UDF dispatch, verification, and conversion logic for
`SQL_SCALAR_PANDAS_UDF` into a single inline handler in `read_udfs()`,
mirroring the pattern already applied to `SQL_SCALAR_ARROW_UDF`
([SPARK-55390](https://issues.apache.org/jira/browse/SPARK-55390)) and
`SQL_ARROW_BATCHED_UDF`
([SPARK-55902](https://issues.apache.org/jira/browse/SPARK-55902)). The
dedicated `ArrowStreamPandasUDFSerializer` is no longer used by the scalar
pandas path, reducing indirection and bringing the eval-type processing paths
closer to a uniform structure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests. No behavior change.
`pyspark.sql.tests.pandas.test_pandas_udf_scalar` (81 tests) plus
`test_pandas_udf`, `test_pandas_udf_typehints`, `test_pandas_udf_window`, and
`test_arrow_python_udf` all pass.
ASV benchmark comparison via `COLUMNS=120 asv run --python=same --bench
\"ScalarPandasUDF\" --attribute \"repeat=(3,5,5.0)\"`:
**ScalarPandasUDFTimeBench** - Before (master):
```text
=================== ============== ============ ===============
-- udf
------------------- -------------------------------------------
scenario identity_udf sort_udf nullcheck_udf
=================== ============== ============ ===============
sm_batch_few_col 366+-2ms 509+-1ms 445+-2ms
sm_batch_many_col 283+-2ms 305+-2ms 293+-2ms
lg_batch_few_col 1.08+-0.01s 1.34+-0.01s 1.16+-0.01s
lg_batch_many_col 1.22+-0s 1.28+-0.01s 1.27+-0.01s
pure_ints 190+-0.8ms 268+-0.4ms 217+-2ms
pure_floats 187+-0.4ms 280+-0.3ms 218+-1ms
pure_strings 1.16+-0.02s 1.70+-0.02s 1.11+-0.01s
pure_ts 348+-3ms 441+-10ms 370+-5ms
mixed_types 650+-2ms 745+-7ms 692+-7ms
=================== ============== ============ ===============
```
**ScalarPandasUDFTimeBench** - After (this PR):
```text
=================== ============== ============ ===============
-- udf
------------------- -------------------------------------------
scenario identity_udf sort_udf nullcheck_udf
=================== ============== ============ ===============
sm_batch_few_col 381+-10ms 503+-2ms 439+-2ms
sm_batch_many_col 289+-3ms 302+-1ms 291+-2ms
lg_batch_few_col 1.09+-0s 1.33+-0.02s 1.17+-0.01s
lg_batch_many_col 1.24+-0.01s 1.29+-0.02s 1.26+-0.02s
pure_ints 190+-0.4ms 270+-0.9ms 220+-0.4ms
pure_floats 188+-0.8ms 277+-0.6ms 218+-1ms
pure_strings 1.13+-0.01s 1.68+-0.01s 1.11+-0.01s
pure_ts 360+-10ms 419+-4ms 369+-1ms
mixed_types 661+-8ms 748+-10ms 697+-20ms
=================== ============== ============ ===============
```
**ScalarPandasUDFPeakmemBench** - Before (master):
```text
=================== ============== ========== ===============
-- udf
------------------- -----------------------------------------
scenario identity_udf sort_udf nullcheck_udf
=================== ============== ========== ===============
sm_batch_few_col 481M 482M 479M
sm_batch_many_col 481M 482M 481M
lg_batch_few_col 621M 619M 602M
lg_batch_many_col 627M 629M 628M
pure_ints 546M 547M 544M
pure_floats 543M 545M 544M
pure_strings 564M 565M 561M
pure_ts 546M 547M 547M
mixed_types 525M 526M 525M
=================== ============== ========== ===============
```
**ScalarPandasUDFPeakmemBench** - After (this PR):
```text
=================== ============== ========== ===============
-- udf
------------------- -----------------------------------------
scenario identity_udf sort_udf nullcheck_udf
=================== ============== ========== ===============
sm_batch_few_col 481M 482M 479M
sm_batch_many_col 481M 482M 481M
lg_batch_few_col 621M 618M 602M
lg_batch_many_col 627M 628M 627M
pure_ints 546M 547M 544M
pure_floats 543M 544M 544M
pure_strings 563M 564M 561M
pure_ts 546M 547M 546M
mixed_types 525M 526M 525M
=================== ============== ========== ===============
```
**Summary**: Latency and peak memory are essentially neutral (within
run-to-run noise). The refactor reorganizes logic without changing data layout
or buffering.
### Was this patch authored or co-authored using generative AI tooling?
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]