Yicong-Huang opened a new pull request, #56440: URL: https://github.com/apache/spark/pull/56440
### What changes were proposed in this pull request? This PR refactors `SQL_ARROW_UDTF` so that the worker uses the plain `ArrowStreamSerializer` for pure Arrow stream I/O, moving the per-batch transformation logic from `ArrowStreamArrowUDTFSerializer` into `read_udtf()` in `worker.py`: - The input-side flattening (struct columns at `table_arg_offsets` are flattened into `pa.RecordBatch`, other columns are passed as `pa.Array`) moves from `ArrowStreamArrowUDTFSerializer.load_stream` into the `mapper`. - The output-side type coercion (`ArrowBatchTransformer.enforce_schema` against the Arrow return type) and struct wrapping (`ArrowBatchTransformer.wrap_struct`) move from the serializer `dump_stream` chain into the `evaluate` wrapper, which now yields ready-to-write record batches instead of `(batch, arrow_return_type)` tuples. `ArrowStreamArrowUDTFSerializer` itself is left in place and will be removed in a follow-up once it has no remaining usages. ### Why are the changes needed? Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388) (Refactor PythonEvalType processing logic). Keeping serializers as pure Arrow stream I/O and concentrating eval-type-specific logic in `worker.py` makes the per-eval-type data flow explicit and removes serializer subclasses that exist only to carry per-eval-type transforms. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests (`pyspark.sql.tests.arrow.test_arrow_udtf`, `pyspark.sql.tests.test_udtf`). No behavior change: replaying identical worker input through the old and new code paths produces byte-identical worker output (modulo the timing section) across 9 scenario/UDTF combinations. ASV benchmark comparison (`bench_eval_type.ArrowUDTFTimeBench`, `-a repeat=3`, 3 runs per side, averaged): ```text scenario udtf before (ms) after (ms) diff (%) sm_batch_few_col identity_udtf 1.37 1.35 -1.5 sm_batch_few_col filter_udtf 1.81 1.84 +1.7 sm_batch_few_col count_udtf 1.07 1.05 -2.2 sm_batch_many_col identity_udtf 2.42 2.33 -3.9 sm_batch_many_col filter_udtf 3.50 3.43 -2.2 sm_batch_many_col count_udtf 1.14 1.11 -2.6 lg_batch_few_col identity_udtf 2.00 2.21 +10.7 lg_batch_few_col filter_udtf 3.33 3.40 +2.2 lg_batch_few_col count_udtf 1.20 1.20 +0.3 lg_batch_many_col identity_udtf 9.72 9.85 +1.4 lg_batch_many_col filter_udtf 15.57 15.70 +0.9 lg_batch_many_col count_udtf 3.54 3.49 -1.5 pure_ints identity_udtf 3.20 3.19 -0.2 pure_ints filter_udtf 4.77 4.38 -8.1 pure_ints count_udtf 1.90 1.85 -2.6 pure_strings identity_udtf 5.16 4.79 -7.2 pure_strings filter_udtf 9.15 9.02 -1.4 pure_strings count_udtf 2.28 2.25 -1.2 ``` The `lg_batch_few_col / identity_udtf` cell is a noise artifact: its per-run confidence intervals overlap (both sides spike to ~2.3ms intermittently on this machine), and re-running it in isolation with min-of-30 direct timing shows the refactored path is faster (min 2.34ms vs 2.59ms, median 2.36ms vs 2.76ms). ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
