Yicong-Huang opened a new pull request, #56440:
URL: https://github.com/apache/spark/pull/56440

   ### What changes were proposed in this pull request?
   
   This PR refactors `SQL_ARROW_UDTF` so that the worker uses the plain 
`ArrowStreamSerializer` for pure Arrow stream I/O, moving the per-batch 
transformation logic from `ArrowStreamArrowUDTFSerializer` into `read_udtf()` 
in `worker.py`:
   
   - The input-side flattening (struct columns at `table_arg_offsets` are 
flattened into `pa.RecordBatch`, other columns are passed as `pa.Array`) moves 
from `ArrowStreamArrowUDTFSerializer.load_stream` into the `mapper`.
   - The output-side type coercion (`ArrowBatchTransformer.enforce_schema` 
against the Arrow return type) and struct wrapping 
(`ArrowBatchTransformer.wrap_struct`) move from the serializer `dump_stream` 
chain into the `evaluate` wrapper, which now yields ready-to-write record 
batches instead of `(batch, arrow_return_type)` tuples.
   
   `ArrowStreamArrowUDTFSerializer` itself is left in place and will be removed 
in a follow-up once it has no remaining usages.
   
   ### Why are the changes needed?
   
   Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388) 
(Refactor PythonEvalType processing logic). Keeping serializers as pure Arrow 
stream I/O and concentrating eval-type-specific logic in `worker.py` makes the 
per-eval-type data flow explicit and removes serializer subclasses that exist 
only to carry per-eval-type transforms.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests (`pyspark.sql.tests.arrow.test_arrow_udtf`, 
`pyspark.sql.tests.test_udtf`). No behavior change: replaying identical worker 
input through the old and new code paths produces byte-identical worker output 
(modulo the timing section) across 9 scenario/UDTF combinations.
   
   ASV benchmark comparison (`bench_eval_type.ArrowUDTFTimeBench`, `-a 
repeat=3`, 3 runs per side, averaged):
   
   ```text
   scenario            udtf             before (ms)  after (ms)  diff (%)
   sm_batch_few_col    identity_udtf           1.37        1.35      -1.5
   sm_batch_few_col    filter_udtf             1.81        1.84      +1.7
   sm_batch_few_col    count_udtf              1.07        1.05      -2.2
   sm_batch_many_col   identity_udtf           2.42        2.33      -3.9
   sm_batch_many_col   filter_udtf             3.50        3.43      -2.2
   sm_batch_many_col   count_udtf              1.14        1.11      -2.6
   lg_batch_few_col    identity_udtf           2.00        2.21     +10.7
   lg_batch_few_col    filter_udtf             3.33        3.40      +2.2
   lg_batch_few_col    count_udtf              1.20        1.20      +0.3
   lg_batch_many_col   identity_udtf           9.72        9.85      +1.4
   lg_batch_many_col   filter_udtf            15.57       15.70      +0.9
   lg_batch_many_col   count_udtf              3.54        3.49      -1.5
   pure_ints           identity_udtf           3.20        3.19      -0.2
   pure_ints           filter_udtf             4.77        4.38      -8.1
   pure_ints           count_udtf              1.90        1.85      -2.6
   pure_strings        identity_udtf           5.16        4.79      -7.2
   pure_strings        filter_udtf             9.15        9.02      -1.4
   pure_strings        count_udtf              2.28        2.25      -1.2
   ```
   
   The `lg_batch_few_col / identity_udtf` cell is a noise artifact: its per-run 
confidence intervals overlap (both sides spike to ~2.3ms intermittently on this 
machine), and re-running it in isolation with min-of-30 direct timing shows the 
refactored path is faster (min 2.34ms vs 2.59ms, median 2.36ms vs 2.76ms).
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to