Yicong-Huang opened a new pull request, #55756:
URL: https://github.com/apache/spark/pull/55756

   ### What changes were proposed in this pull request?
   
   Refactor `SQL_SCALAR_PANDAS_ITER_UDF` to use `ArrowStreamSerializer` as pure 
I/O, moving the iterator scalar Pandas UDF logic from 
`ArrowStreamPandasUDFSerializer` into `read_udfs()` in `worker.py`.
   
   Specifically:
   - Drop `wrap_pandas_batch_iter_udf` for the scalar iter path (still used by 
`SQL_MAP_PANDAS_ITER_UDF`).
   - Route `SQL_SCALAR_PANDAS_ITER_UDF` through 
`ArrowStreamSerializer(write_start_stream=True)`.
   - In `read_udfs()`, add a self-contained handler that:
     - Streams Arrow `RecordBatch` -> pandas Series via 
`ArrowBatchTransformer.to_pandas()` (`struct_in_pandas="dict"`, 
`df_for_struct=True`, `ndarray_as_list=False`), one element per input batch.
     - Invokes the user iterator UDF with the per-batch pandas args iterator 
and validates the returned iterable.
     - For each yielded element: validates it is `pandas.DataFrame` (struct 
return type) or `pandas.Series` (otherwise), runs `verify_pandas_result` 
(column names / counts), and converts back to an Arrow `RecordBatch` via 
`PandasToArrowConversion.convert()`.
     - Reuses the iterator helpers introduced for `SQL_SCALAR_ARROW_ITER_UDF`: 
`verify_output_row_limit` (fail-fast `OUTPUT_EXCEEDS_INPUT_ROWS`), 
`verify_output_row_count` (final `RESULT_ROWS_MISMATCH`), and 
`verify_iterator_exhausted` (`INPUT_NOT_FULLY_CONSUMED`).
   - Split the trailing combined `is_scalar_iter or is_map_pandas_iter` block; 
`SQL_MAP_PANDAS_ITER_UDF` now lives in its own slim block (no row-count guards, 
since they only applied to scalar iter).
   
   ### Why are the changes needed?
   
   Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). 
This consolidates UDF dispatch, conversion, and verification logic for 
`SQL_SCALAR_PANDAS_ITER_UDF` into a single inline handler in `read_udfs()`, 
mirroring the pattern already applied to `SQL_SCALAR_PANDAS_UDF` 
([SPARK-56648](https://issues.apache.org/jira/browse/SPARK-56648)) and 
`SQL_SCALAR_ARROW_ITER_UDF` 
([SPARK-55577](https://issues.apache.org/jira/browse/SPARK-55577)). The 
dedicated `ArrowStreamPandasUDFSerializer` is no longer used by the scalar 
pandas iter path, reducing indirection and bringing the eval-type processing 
paths closer to a uniform structure.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests. No behavior change.
   
   `pyspark.sql.tests.pandas.test_pandas_udf_scalar` plus `test_pandas_udf`, 
`test_pandas_udf_typehints`, `test_pandas_udf_window`, `test_arrow_python_udf`, 
and `test_pandas_map` (covering the split MAP_PANDAS_ITER block) all pass.
   
   ASV benchmark comparison via `COLUMNS=120 asv run --python=same --bench 
"ScalarPandasIterUDF" -a repeat=3`. before = `upstream/master`, after = this PR.
   
   **ScalarPandasIterUDFTimeBench** (latency):
   
   ```text
   scenario             udf              before    after     diff
   -------------------  --------------  -------  -------  -------
   sm_batch_few_col     identity_udf      391ms    377ms    -3.6%
   sm_batch_few_col     sort_udf          537ms    519ms    -3.2%
   sm_batch_few_col     nullcheck_udf     460ms    449ms    -2.5%
   sm_batch_many_col    identity_udf      338ms    302ms   -10.8%
   sm_batch_many_col    sort_udf          353ms    328ms    -7.2%
   sm_batch_many_col    nullcheck_udf     337ms    318ms    -5.6%
   lg_batch_few_col     identity_udf      1.21s    1.15s    -5.0%
   lg_batch_few_col     sort_udf          1.48s    1.42s    -4.0%
   lg_batch_few_col     nullcheck_udf     1.35s    1.22s    -9.0%
   lg_batch_many_col    identity_udf      1.42s    1.34s    -5.7%
   lg_batch_many_col    sort_udf          1.46s    1.36s    -7.0%
   lg_batch_many_col    nullcheck_udf     1.43s    1.34s    -6.5%
   pure_ints            identity_udf      203ms    193ms    -4.9%
   pure_ints            sort_udf          291ms    274ms    -5.8%
   pure_ints            nullcheck_udf     235ms    223ms    -5.4%
   pure_floats          identity_udf      202ms    192ms    -4.8%
   pure_floats          sort_udf          297ms    282ms    -5.0%
   pure_floats          nullcheck_udf     235ms    222ms    -5.6%
   pure_strings         identity_udf      1.38s    1.31s    -5.1%
   pure_strings         sort_udf          1.96s    1.73s   -11.5%
   pure_strings         nullcheck_udf     1.35s    1.15s   -14.7%
   pure_ts              identity_udf      381ms    357ms    -6.3%
   pure_ts              sort_udf          457ms    427ms    -6.4%
   pure_ts              nullcheck_udf     408ms    380ms    -6.7%
   mixed_types          identity_udf      713ms    673ms    -5.6%
   mixed_types          sort_udf          828ms    747ms    -9.8%
   mixed_types          nullcheck_udf     755ms    709ms    -6.0%
   ```
   
   **ScalarPandasIterUDFPeakmemBench** (peak memory):
   
   ```text
   scenario             udf              before    after     diff
   -------------------  --------------  -------  -------  -------
   sm_batch_few_col     identity_udf       459M     459M    -0.0%
   sm_batch_few_col     sort_udf           460M     460M    -0.0%
   sm_batch_few_col     nullcheck_udf      457M     457M    -0.0%
   sm_batch_many_col    identity_udf       459M     459M    -0.0%
   sm_batch_many_col    sort_udf           460M     460M    -0.0%
   sm_batch_many_col    nullcheck_udf      459M     459M    -0.0%
   lg_batch_few_col     identity_udf       591M     591M    +0.1%
   lg_batch_few_col     sort_udf           588M     589M    +0.2%
   lg_batch_few_col     nullcheck_udf      574M     574M    -0.0%
   lg_batch_many_col    identity_udf       598M     598M    -0.0%
   lg_batch_many_col    sort_udf           600M     599M    -0.0%
   lg_batch_many_col    nullcheck_udf      599M     599M    +0.0%
   pure_ints            identity_udf       521M     521M    -0.0%
   pure_ints            sort_udf           522M     522M    -0.0%
   pure_ints            nullcheck_udf      519M     519M    +0.0%
   pure_floats          identity_udf       518M     518M    +0.0%
   pure_floats          sort_udf           519M     520M    +0.0%
   pure_floats          nullcheck_udf      519M     519M    +0.0%
   pure_strings         identity_udf       538M     537M    -0.1%
   pure_strings         sort_udf           539M     538M    -0.1%
   pure_strings         nullcheck_udf      535M     535M    -0.1%
   pure_ts              identity_udf       521M     521M    +0.0%
   pure_ts              sort_udf           522M     522M    +0.0%
   pure_ts              nullcheck_udf      521M     522M    +0.0%
   mixed_types          identity_udf       501M     501M    -0.0%
   mixed_types          sort_udf           502M     501M    -0.0%
   mixed_types          nullcheck_udf      501M     501M    -0.0%
   ```
   
   **Summary**: latency improves -2.5% to -14.7% across all 27 (scenario, udf) 
combos (likely from dropped indirection and removal of the per-batch `(result, 
return_type)` tuple yielding); peak memory is flat (within +/-0.2%).
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to