Yicong-Huang opened a new pull request, #56458:
URL: https://github.com/apache/spark/pull/56458

   ### What changes were proposed in this pull request?
   
   This PR refactors `SQL_ARROW_TABLE_UDF` (arrow-optimized Python UDTF, 
`@udtf(useArrow=True)`) so that the worker uses the plain 
`ArrowStreamSerializer` for pure Arrow stream I/O, moving the remaining 
per-batch transformation logic from `ArrowStreamUDTFSerializer` into 
`read_udtf()` in `worker.py`:
   
   - The input side already received raw Arrow record batches 
(`ArrowStreamUDTFSerializer.load_stream` delegates to 
`ArrowStreamSerializer.load_stream`), so loading is unchanged.
   - The output-side struct wrapping (`ArrowBatchTransformer.wrap_struct`) 
moves from the serializer `dump_stream` chain into the `evaluate` wrapper, 
which now yields ready-to-write record batches instead of `(batch, 
arrow_return_type)` tuples.
   
   The legacy pandas conversion path (`use_legacy_pandas_udtf_conversion`, 
using `ArrowStreamPandasUDTFSerializer`) is unchanged. 
`ArrowStreamUDTFSerializer` itself is left in place and will be removed in a 
follow-up once it has no remaining usages.
   
   ### Why are the changes needed?
   
   Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). 
Keeping serializers as pure Arrow stream I/O and concentrating 
eval-type-specific logic in `worker.py` makes the per-eval-type data flow 
explicit and removes serializer subclasses that exist only to carry 
per-eval-type transforms.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests (`pyspark.sql.tests.test_udtf`). No behavior change: 
replaying identical worker input through the old and new code paths produces 
byte-identical worker output (modulo the timing section) across 24 
scenario/UDTF combinations.
   
   ASV benchmark comparison (`bench_eval_type.ArrowTableUDFTimeBench`, `-a 
repeat=3`, 3 runs per side, averaged). before = `upstream/master`, after = this 
PR.
   
   ```text
   scenario            udtf            before (ms)  after (ms)     diff
   ------------------- --------------- ----------- ----------- --------
   sm_batch_few_col    identity_udtf        155.7ms      155.3ms    -0.2%
   sm_batch_few_col    explode_udtf         157.3ms      164.7ms    +4.7%
   sm_batch_few_col    filter_udtf          124.7ms      123.0ms    -1.3%
   sm_batch_few_col    stringify_udtf       155.7ms      155.7ms    +0.0%
   sm_batch_many_col   identity_udtf         51.8ms       51.4ms    -0.8%
   sm_batch_many_col   explode_udtf          52.6ms       52.3ms    -0.7%
   sm_batch_many_col   filter_udtf           44.9ms       43.1ms    -4.1%
   sm_batch_many_col   stringify_udtf        51.7ms       51.4ms    -0.5%
   lg_batch_few_col    identity_udtf        397.7ms      387.0ms    -2.7%
   lg_batch_few_col    explode_udtf         393.7ms      393.3ms    -0.1%
   lg_batch_few_col    filter_udtf          302.0ms      300.7ms    -0.4%
   lg_batch_few_col    stringify_udtf       386.7ms      388.3ms    +0.4%
   lg_batch_many_col   identity_udtf        207.0ms      203.7ms    -1.6%
   lg_batch_many_col   explode_udtf         207.7ms      205.0ms    -1.3%
   lg_batch_many_col   filter_udtf          175.3ms      169.0ms    -3.6%
   lg_batch_many_col   stringify_udtf       206.7ms      207.0ms    +0.2%
   pure_ints           identity_udtf        392.7ms      401.3ms    +2.2%
   pure_ints           explode_udtf         399.0ms      399.7ms    +0.2%
   pure_ints           filter_udtf          310.3ms      307.3ms    -1.0%
   pure_ints           stringify_udtf       392.0ms      394.7ms    +0.7%
   pure_strings        identity_udtf        410.7ms      419.0ms    +2.0%
   pure_strings        explode_udtf         416.0ms      427.3ms    +2.7%
   pure_strings        filter_udtf          329.0ms      324.3ms    -1.4%
   pure_strings        stringify_udtf       412.0ms      409.0ms    -0.7%
   ```
   
   The `sm_batch_few_col / explode_udtf` cell is a noise artifact from one 
noisy ASV run (a 180ms outlier; the other two runs measured 159ms/155ms, in 
line with before). Re-running it in isolation with min-of-30 direct timing 
shows no regression (min 153.0ms before vs 153.3ms after, median 154.6ms vs 
154.9ms).
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to