Yicong-Huang opened a new pull request, #56464: URL: https://github.com/apache/spark/pull/56464
### What changes were proposed in this pull request? This PR refactors `SQL_TRANSFORM_WITH_STATE_PANDAS_UDF` so that the worker uses the plain `ArrowStreamSerializer` for pure Arrow stream I/O, moving the per-eval-type logic (regrouping rows by grouping key, re-chunking into pandas DataFrames bounded by `arrow_max_records_per_batch`/`arrow_max_bytes_per_batch`, and converting result DataFrames back to Arrow) from `TransformWithStateInPandasSerializer` into `read_udfs()` in `worker.py`. The serializer class is kept for now since `TransformWithStateInPandasInitStateSerializer` (init-state variant, tracked separately) still subclasses it. ### Why are the changes needed? Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). Keeping serializers as pure Arrow stream I/O and concentrating eval-type-specific logic in `worker.py` makes the per-eval-type data flow explicit. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests (`pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state`). No behavior change: replaying identical worker input through the old and new code paths produces byte-identical worker output across 12 scenario/UDF combinations. ASV comparison (`bench_eval_type.TransformWithStatePandasUDFTimeBench` / `TransformWithStatePandasUDFPeakmemBench`, `-a repeat=3`): before = `upstream/master`, after = this PR. Values are from one representative run per side; the conclusion is consistent across the 2 runs performed per side. ```text time_worker scenario udf before after diff --------------- ------------ ------------ ------------ ------ few_groups_sm identity_udf 754+-5ms 743+-3ms -1.5% few_groups_sm sort_udf 781+-20ms 760+-0.6ms -2.7% few_groups_sm count_udf 726+-3ms 722+-1ms -0.6% few_groups_lg identity_udf 7.01+-0.03s 6.99+-0.01s -0.3% few_groups_lg sort_udf 7.21+-0.03s 7.10+-0.02s -1.5% few_groups_lg count_udf 6.67+-0.04s 6.60+-0.01s -1.0% many_groups_sm identity_udf 6.24+-0.03s 6.13+-0.01s -1.8% many_groups_sm sort_udf 6.64+-0.01s 6.69+-0.02s +0.8% many_groups_sm count_udf 5.76+-0.07s 5.89+-0.1s +2.3% many_groups_lg identity_udf 3.54+-0.02s 3.53+-0.04s -0.3% many_groups_lg sort_udf 3.65+-0.03s 3.63+-0.01s -0.5% many_groups_lg count_udf 3.40+-0.04s 3.39+-0.03s -0.3% wide_cols identity_udf 7.45+-0.07s 7.35+-0.03s -1.3% wide_cols sort_udf 7.51+-0.03s 7.42+-0.03s -1.2% wide_cols count_udf 6.85+-0.03s 6.80+-0.01s -0.7% mixed_cols identity_udf 3.15+-0.01s 3.16+-0.01s +0.3% mixed_cols sort_udf 3.36+-0.06s 3.28+-0s -2.4% mixed_cols count_udf 2.85+-0.02s 2.91+-0.04s +2.1% nested_struct identity_udf 7.47+-0.03s 7.33+-0.01s -1.9% nested_struct sort_udf 8.63+-0.2s 8.36+-0.07s -3.1% nested_struct count_udf 5.27+-0.03s 5.16+-0.01s -2.1% ``` ```text peakmem_worker scenario udf before after --------------- ------------ ------- ------- few_groups_sm identity_udf 104M 106M few_groups_sm sort_udf 108M 107M few_groups_sm count_udf 97.9M 98.1M few_groups_lg identity_udf 200M 200M few_groups_lg sort_udf 201M 189M few_groups_lg count_udf 161M 161M many_groups_sm identity_udf 130M 130M many_groups_sm sort_udf 131M 131M many_groups_sm count_udf 113M 113M many_groups_lg identity_udf 136M 133M many_groups_lg sort_udf 132M 134M many_groups_lg count_udf 113M 113M wide_cols identity_udf 240M 242M wide_cols sort_udf 241M 241M wide_cols count_udf 205M 205M mixed_cols identity_udf 182M 182M mixed_cols sort_udf 182M 182M mixed_cols count_udf 182M 182M nested_struct identity_udf 210M 210M nested_struct sort_udf 210M 210M nested_struct count_udf 210M 210M ``` ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
