Yicong-Huang opened a new pull request, #55613:
URL: https://github.com/apache/spark/pull/55613

   ### What changes were proposed in this pull request?
   
   Refactor `SQL_SCALAR_PANDAS_UDF` to use `ArrowStreamSerializer` as pure I/O, 
moving Arrow-to-Pandas and Pandas-to-Arrow conversion logic from 
`ArrowStreamPandasUDFSerializer` into `read_udfs()` in `worker.py`.
   
   Specifically:
   - Remove the dedicated `wrap_scalar_pandas_udf` wrapper.
   - Route `SQL_SCALAR_PANDAS_UDF` through 
`ArrowStreamSerializer(write_start_stream=True)`.
   - In `read_udfs()`, add a self-contained handler that:
     - Converts each Arrow `RecordBatch` to pandas Series via 
`ArrowBatchTransformer.to_pandas()` (with `struct_in_pandas=\"dict\"`, 
`df_for_struct=True`, `ndarray_as_list=False`).
     - Invokes each UDF column-wise on the pandas inputs and validates the 
return type (must be array-like) and row count (must match input).
     - Enforces the existing rule that struct return types must be 
`pandas.DataFrame`.
     - Converts results back to an Arrow `RecordBatch` via 
`PandasToArrowConversion.convert()`.
   
   ### Why are the changes needed?
   
   Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). 
This consolidates UDF dispatch, verification, and conversion logic for 
`SQL_SCALAR_PANDAS_UDF` into a single inline handler in `read_udfs()`, 
mirroring the pattern already applied to `SQL_SCALAR_ARROW_UDF` 
([SPARK-55390](https://issues.apache.org/jira/browse/SPARK-55390)) and 
`SQL_ARROW_BATCHED_UDF` 
([SPARK-55902](https://issues.apache.org/jira/browse/SPARK-55902)). The 
dedicated `ArrowStreamPandasUDFSerializer` is no longer used by the scalar 
pandas path, reducing indirection and bringing the eval-type processing paths 
closer to a uniform structure.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests. No behavior change.
   
   `pyspark.sql.tests.pandas.test_pandas_udf_scalar` (81 tests) plus 
`test_pandas_udf`, `test_pandas_udf_typehints`, `test_pandas_udf_window`, and 
`test_arrow_python_udf` all pass.
   
   ASV benchmark comparison via `COLUMNS=120 asv run --python=same --bench 
\"ScalarPandasUDF\" --attribute \"repeat=(3,5,5.0)\"`:
   
   **ScalarPandasUDFTimeBench** - Before (master):
   ```text
   =================== ============== ============ ===============
   --                                      udf
   ------------------- -------------------------------------------
         scenario       identity_udf    sort_udf    nullcheck_udf
   =================== ============== ============ ===============
     sm_batch_few_col     366+-2ms       509+-1ms      445+-2ms
    sm_batch_many_col     283+-2ms       305+-2ms      293+-2ms
     lg_batch_few_col    1.08+-0.01s   1.34+-0.01s   1.16+-0.01s
    lg_batch_many_col     1.22+-0s     1.28+-0.01s   1.27+-0.01s
        pure_ints        190+-0.8ms     268+-0.4ms     217+-2ms
       pure_floats       187+-0.4ms     280+-0.3ms     218+-1ms
       pure_strings     1.16+-0.02s    1.70+-0.02s   1.11+-0.01s
          pure_ts         348+-3ms       441+-10ms     370+-5ms
       mixed_types        650+-2ms       745+-7ms      692+-7ms
   =================== ============== ============ ===============
   ```
   
   **ScalarPandasUDFTimeBench** - After (this PR):
   ```text
   =================== ============== ============ ===============
   --                                      udf
   ------------------- -------------------------------------------
         scenario       identity_udf    sort_udf    nullcheck_udf
   =================== ============== ============ ===============
     sm_batch_few_col     381+-10ms     503+-2ms      439+-2ms
    sm_batch_many_col     289+-3ms      302+-1ms      291+-2ms
     lg_batch_few_col    1.09+-0s     1.33+-0.02s   1.17+-0.01s
    lg_batch_many_col   1.24+-0.01s   1.29+-0.02s   1.26+-0.02s
        pure_ints       190+-0.4ms     270+-0.9ms    220+-0.4ms
       pure_floats      188+-0.8ms     277+-0.6ms     218+-1ms
       pure_strings    1.13+-0.01s   1.68+-0.01s   1.11+-0.01s
          pure_ts         360+-10ms     419+-4ms      369+-1ms
       mixed_types        661+-8ms      748+-10ms    697+-20ms
   =================== ============== ============ ===============
   ```
   
   **ScalarPandasUDFPeakmemBench** - Before (master):
   ```text
   =================== ============== ========== ===============
   --                                     udf
   ------------------- -----------------------------------------
         scenario       identity_udf   sort_udf   nullcheck_udf
   =================== ============== ========== ===============
     sm_batch_few_col       481M         482M         479M
    sm_batch_many_col       481M         482M         481M
     lg_batch_few_col       621M         619M         602M
    lg_batch_many_col       627M         629M         628M
        pure_ints           546M         547M         544M
       pure_floats          543M         545M         544M
       pure_strings         564M         565M         561M
          pure_ts           546M         547M         547M
       mixed_types          525M         526M         525M
   =================== ============== ========== ===============
   ```
   
   **ScalarPandasUDFPeakmemBench** - After (this PR):
   ```text
   =================== ============== ========== ===============
   --                                     udf
   ------------------- -----------------------------------------
         scenario       identity_udf   sort_udf   nullcheck_udf
   =================== ============== ========== ===============
     sm_batch_few_col       481M         482M         479M
    sm_batch_many_col       481M         482M         481M
     lg_batch_few_col       621M         618M         602M
    lg_batch_many_col       627M         628M         627M
        pure_ints           546M         547M         544M
       pure_floats          543M         544M         544M
       pure_strings         563M         564M         561M
          pure_ts           546M         547M         546M
       mixed_types          525M         526M         525M
   =================== ============== ========== ===============
   ```
   
   **Summary**: Latency and peak memory are essentially neutral (within 
run-to-run noise). The refactor reorganizes logic without changing data layout 
or buffering.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to