Yicong-Huang opened a new pull request, #55750:
URL: https://github.com/apache/spark/pull/55750

   ### What changes were proposed in this pull request?
   
   Consolidate the `SQL_MAP_PANDAS_ITER_UDF` (`mapInPandas`) execution path so 
that all input transformation, UDF invocation, result verification, and output 
transformation live in a single block in `read_udfs()`, matching the patterns 
established by [SPARK-55389](https://issues.apache.org/jira/browse/SPARK-55389) 
(`SQL_MAP_ARROW_ITER_UDF`) and 
[SPARK-56691](https://issues.apache.org/jira/browse/SPARK-56691) 
(`SQL_GROUPED_MAP_PANDAS_ITER_UDF`).
   
   - Stop wrapping the UDF in `wrap_pandas_batch_iter_udf` for 
`MAP_PANDAS_ITER`. `read_single_udf` now returns `(func, return_type)` for this 
eval type.
   - Switch the serializer from `ArrowStreamPandasUDFSerializer` to 
`ArrowStreamSerializer(write_start_stream=True)` so the wrapper receives raw 
`Iterator[pa.RecordBatch]` and owns the Arrow<->pandas conversion.
   - Add a dedicated `SQL_MAP_PANDAS_ITER_UDF` branch in `read_udfs` that 
lazily converts each batch's struct column to `pd.DataFrame` (so peakmem stays 
bounded by a single batch), feeds the iterator to the UDF, verifies each 
yielded DataFrame against the return schema, and converts the result back to 
Arrow via `PandasToArrowConversion.convert`.
   - Drop `MAP_PANDAS_ITER` from the shared scalar-iter mapper (the 
`is_scalar_iter`/`is_map_pandas_iter` split collapses to a single 
`SCALAR_PANDAS_ITER` branch).
   - Make `verify_result` use the top-level package name only (e.g. 
`pandas.core` -> `pandas`) so the user-visible label remains `pandas.DataFrame` 
rather than `pandas.core.DataFrame`. The label for `pyarrow.RecordBatch` and 
`pyarrow.Array` is unchanged.
   
   ### Why are the changes needed?
   
   Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). 
The existing logic for this eval type was scattered across a wrapper function, 
a shared mapper branch, and the serializer. After this change the full data 
flow for `mapInPandas` is visible in one place.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests. No behavior change.
   
   - `pyspark/sql/tests/pandas/test_pandas_map.py` -- 28 passed
   - `pyspark/sql/tests/arrow/test_arrow_map.py` + `test_arrow_grouped_map.py` 
+ `pyspark/sql/tests/pandas/test_pandas_udf_scalar.py` -- 176 passed, 2 skipped
   - `pyspark/sql/tests/pandas/test_pandas_udf.py` + 
`test_pandas_udf_typehints.py` -- 36 passed
   
   ASV `MapPandasIterUDFTimeBench` comparison (`-a repeat=3`, before = 
`upstream/master`, after = this branch):
   
   ```text
   scenario             udf            before (ms)   after (ms)   diff (%)
   sm_batch_few_col     identity_udf       311            310         -0.3
   sm_batch_few_col     sort_udf           365            367         +0.5
   sm_batch_few_col     filter_udf         347            341         -1.7
   sm_batch_many_col    identity_udf       213            212         -0.5
   sm_batch_many_col    sort_udf           231            233         +0.9
   sm_batch_many_col    filter_udf         216            217         +0.5
   lg_batch_few_col     identity_udf       850            765        -10.0
   lg_batch_few_col     sort_udf          1030            965         -6.3
   lg_batch_few_col     filter_udf         815            804         -1.3
   lg_batch_many_col    identity_udf       791            787         -0.5
   lg_batch_many_col    sort_udf          1290           1280         -0.8
   lg_batch_many_col    filter_udf         889            811         -8.8
   pure_ints            identity_udf       152            140         -7.9
   pure_ints            sort_udf           168            166         -1.2
   pure_ints            filter_udf         164            151         -7.9
   pure_floats          identity_udf       158            139        -12.0
   pure_floats          sort_udf           183            166         -9.3
   pure_floats          filter_udf         179            161        -10.1
   pure_strings         identity_udf       636            609         -4.2
   pure_strings         sort_udf           894            799        -10.6
   pure_strings         filter_udf         762            650        -14.7
   pure_ts              identity_udf       267            211        -21.0
   pure_ts              sort_udf           303            230        -24.1
   pure_ts              filter_udf         273            233        -14.7
   mixed_types          identity_udf       571            435        -23.8
   mixed_types          sort_udf           668            518        -22.5
   mixed_types          filter_udf         507            460         -9.3
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to