Yicong-Huang commented on code in PR #54125:
URL: https://github.com/apache/spark/pull/54125#discussion_r2800824996
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -439,122 +430,52 @@ def __init__(
self._input_type = input_type
self._arrow_cast = arrow_cast
- def _create_array(self, series, spark_type, *, arrow_cast=False,
prefers_large_types=False):
+ def dump_stream(self, iterator, stream):
"""
- Create an Arrow Array from the given pandas.Series and Spark type.
-
- Parameters
- ----------
- series : pandas.Series
- A single series
- spark_type : DataType, optional
- The Spark return type. For UDF return types, this should always be
provided
- and should never be None. If None, pyarrow's inferred type will be
used
- (for backward compatibility).
- arrow_cast : bool, optional
- Whether to apply Arrow casting when the user-specified return type
mismatches the
- actual return values.
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
-
- Returns
- -------
- pyarrow.Array
+ Make ArrowRecordBatches from Pandas Series and serialize.
+ Each element in iterator is:
+ - For batched UDFs: tuple of (series, spark_type) tuples: ((s1, t1),
(s2, t2), ...)
+ - For iterator UDFs: single (series, spark_type) tuple directly
"""
- import pyarrow as pa
- import pandas as pd
- if isinstance(series.dtype, pd.CategoricalDtype):
- series = series.astype(series.dtypes.categories.dtype)
-
- # Derive arrow_type from spark_type
- arrow_type = (
- to_arrow_type(
- spark_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
- )
- if spark_type is not None
- else None
- )
+ def create_batch(
+ packed: Union[
+ Tuple["pd.Series", DataType],
+ Tuple[Tuple["pd.Series", DataType], ...],
Review Comment:
Addressed — extracted `_normalize_packed` helper to normalize the input
upfront. Now `create_batch` always receives a uniform tuple-of-tuples form.
Changing all the callsites (eval type wrappers in `worker.py`) to always
produce uniform output is a larger change that would be out of scope for this
PR. We'll extract those logic out to each eval type in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]