Yicong-Huang opened a new pull request, #56464:
URL: https://github.com/apache/spark/pull/56464

   ### What changes were proposed in this pull request?
   
   This PR refactors `SQL_TRANSFORM_WITH_STATE_PANDAS_UDF` so that the worker 
uses the plain `ArrowStreamSerializer` for pure Arrow stream I/O, moving the 
per-eval-type logic (regrouping rows by grouping key, re-chunking into pandas 
DataFrames bounded by 
`arrow_max_records_per_batch`/`arrow_max_bytes_per_batch`, and converting 
result DataFrames back to Arrow) from `TransformWithStateInPandasSerializer` 
into `read_udfs()` in `worker.py`. The serializer class is kept for now since 
`TransformWithStateInPandasInitStateSerializer` (init-state variant, tracked 
separately) still subclasses it.
   
   ### Why are the changes needed?
   
   Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). 
Keeping serializers as pure Arrow stream I/O and concentrating 
eval-type-specific logic in `worker.py` makes the per-eval-type data flow 
explicit.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests 
(`pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state`). No 
behavior change: replaying identical worker input through the old and new code 
paths produces byte-identical worker output across 12 scenario/UDF combinations.
   
   ASV comparison (`bench_eval_type.TransformWithStatePandasUDFTimeBench` / 
`TransformWithStatePandasUDFPeakmemBench`, `-a repeat=3`): before = 
`upstream/master`, after = this PR. Values are from one representative run per 
side; the conclusion is consistent across the 2 runs performed per side.
   
   ```text
   time_worker
   
   scenario         udf            before        after         diff
   ---------------  ------------  ------------  ------------  ------
   few_groups_sm    identity_udf   754+-5ms      743+-3ms      -1.5%
   few_groups_sm    sort_udf       781+-20ms     760+-0.6ms    -2.7%
   few_groups_sm    count_udf      726+-3ms      722+-1ms      -0.6%
   few_groups_lg    identity_udf   7.01+-0.03s   6.99+-0.01s   -0.3%
   few_groups_lg    sort_udf       7.21+-0.03s   7.10+-0.02s   -1.5%
   few_groups_lg    count_udf      6.67+-0.04s   6.60+-0.01s   -1.0%
   many_groups_sm   identity_udf   6.24+-0.03s   6.13+-0.01s   -1.8%
   many_groups_sm   sort_udf       6.64+-0.01s   6.69+-0.02s   +0.8%
   many_groups_sm   count_udf      5.76+-0.07s   5.89+-0.1s    +2.3%
   many_groups_lg   identity_udf   3.54+-0.02s   3.53+-0.04s   -0.3%
   many_groups_lg   sort_udf       3.65+-0.03s   3.63+-0.01s   -0.5%
   many_groups_lg   count_udf      3.40+-0.04s   3.39+-0.03s   -0.3%
   wide_cols        identity_udf   7.45+-0.07s   7.35+-0.03s   -1.3%
   wide_cols        sort_udf       7.51+-0.03s   7.42+-0.03s   -1.2%
   wide_cols        count_udf      6.85+-0.03s   6.80+-0.01s   -0.7%
   mixed_cols       identity_udf   3.15+-0.01s   3.16+-0.01s   +0.3%
   mixed_cols       sort_udf       3.36+-0.06s   3.28+-0s      -2.4%
   mixed_cols       count_udf      2.85+-0.02s   2.91+-0.04s   +2.1%
   nested_struct    identity_udf   7.47+-0.03s   7.33+-0.01s   -1.9%
   nested_struct    sort_udf       8.63+-0.2s    8.36+-0.07s   -3.1%
   nested_struct    count_udf      5.27+-0.03s   5.16+-0.01s   -2.1%
   ```
   
   ```text
   peakmem_worker
   
   scenario         udf            before   after
   ---------------  ------------  -------  -------
   few_groups_sm    identity_udf   104M     106M
   few_groups_sm    sort_udf       108M     107M
   few_groups_sm    count_udf      97.9M    98.1M
   few_groups_lg    identity_udf   200M     200M
   few_groups_lg    sort_udf       201M     189M
   few_groups_lg    count_udf      161M     161M
   many_groups_sm   identity_udf   130M     130M
   many_groups_sm   sort_udf       131M     131M
   many_groups_sm   count_udf      113M     113M
   many_groups_lg   identity_udf   136M     133M
   many_groups_lg   sort_udf       132M     134M
   many_groups_lg   count_udf      113M     113M
   wide_cols        identity_udf   240M     242M
   wide_cols        sort_udf       241M     241M
   wide_cols        count_udf      205M     205M
   mixed_cols       identity_udf   182M     182M
   mixed_cols       sort_udf       182M     182M
   mixed_cols       count_udf      182M     182M
   nested_struct    identity_udf   210M     210M
   nested_struct    sort_udf       210M     210M
   nested_struct    count_udf      210M     210M
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to