Yicong-Huang opened a new pull request, #56458: URL: https://github.com/apache/spark/pull/56458
### What changes were proposed in this pull request? This PR refactors `SQL_ARROW_TABLE_UDF` (arrow-optimized Python UDTF, `@udtf(useArrow=True)`) so that the worker uses the plain `ArrowStreamSerializer` for pure Arrow stream I/O, moving the remaining per-batch transformation logic from `ArrowStreamUDTFSerializer` into `read_udtf()` in `worker.py`: - The input side already received raw Arrow record batches (`ArrowStreamUDTFSerializer.load_stream` delegates to `ArrowStreamSerializer.load_stream`), so loading is unchanged. - The output-side struct wrapping (`ArrowBatchTransformer.wrap_struct`) moves from the serializer `dump_stream` chain into the `evaluate` wrapper, which now yields ready-to-write record batches instead of `(batch, arrow_return_type)` tuples. The legacy pandas conversion path (`use_legacy_pandas_udtf_conversion`, using `ArrowStreamPandasUDTFSerializer`) is unchanged. `ArrowStreamUDTFSerializer` itself is left in place and will be removed in a follow-up once it has no remaining usages. ### Why are the changes needed? Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). Keeping serializers as pure Arrow stream I/O and concentrating eval-type-specific logic in `worker.py` makes the per-eval-type data flow explicit and removes serializer subclasses that exist only to carry per-eval-type transforms. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests (`pyspark.sql.tests.test_udtf`). No behavior change: replaying identical worker input through the old and new code paths produces byte-identical worker output (modulo the timing section) across 24 scenario/UDTF combinations. ASV benchmark comparison (`bench_eval_type.ArrowTableUDFTimeBench`, `-a repeat=3`, 3 runs per side, averaged). before = `upstream/master`, after = this PR. ```text scenario udtf before (ms) after (ms) diff ------------------- --------------- ----------- ----------- -------- sm_batch_few_col identity_udtf 155.7ms 155.3ms -0.2% sm_batch_few_col explode_udtf 157.3ms 164.7ms +4.7% sm_batch_few_col filter_udtf 124.7ms 123.0ms -1.3% sm_batch_few_col stringify_udtf 155.7ms 155.7ms +0.0% sm_batch_many_col identity_udtf 51.8ms 51.4ms -0.8% sm_batch_many_col explode_udtf 52.6ms 52.3ms -0.7% sm_batch_many_col filter_udtf 44.9ms 43.1ms -4.1% sm_batch_many_col stringify_udtf 51.7ms 51.4ms -0.5% lg_batch_few_col identity_udtf 397.7ms 387.0ms -2.7% lg_batch_few_col explode_udtf 393.7ms 393.3ms -0.1% lg_batch_few_col filter_udtf 302.0ms 300.7ms -0.4% lg_batch_few_col stringify_udtf 386.7ms 388.3ms +0.4% lg_batch_many_col identity_udtf 207.0ms 203.7ms -1.6% lg_batch_many_col explode_udtf 207.7ms 205.0ms -1.3% lg_batch_many_col filter_udtf 175.3ms 169.0ms -3.6% lg_batch_many_col stringify_udtf 206.7ms 207.0ms +0.2% pure_ints identity_udtf 392.7ms 401.3ms +2.2% pure_ints explode_udtf 399.0ms 399.7ms +0.2% pure_ints filter_udtf 310.3ms 307.3ms -1.0% pure_ints stringify_udtf 392.0ms 394.7ms +0.7% pure_strings identity_udtf 410.7ms 419.0ms +2.0% pure_strings explode_udtf 416.0ms 427.3ms +2.7% pure_strings filter_udtf 329.0ms 324.3ms -1.4% pure_strings stringify_udtf 412.0ms 409.0ms -0.7% ``` The `sm_batch_few_col / explode_udtf` cell is a noise artifact from one noisy ASV run (a 180ms outlier; the other two runs measured 159ms/155ms, in line with before). Re-running it in isolation with min-of-30 direct timing shows no regression (min 153.0ms before vs 153.3ms after, median 154.6ms vs 154.9ms). ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
