Yicong-Huang commented on code in PR #54125:
URL: https://github.com/apache/spark/pull/54125#discussion_r2800824996


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -439,122 +430,52 @@ def __init__(
         self._input_type = input_type
         self._arrow_cast = arrow_cast
 
-    def _create_array(self, series, spark_type, *, arrow_cast=False, 
prefers_large_types=False):
+    def dump_stream(self, iterator, stream):
         """
-        Create an Arrow Array from the given pandas.Series and Spark type.
-
-        Parameters
-        ----------
-        series : pandas.Series
-            A single series
-        spark_type : DataType, optional
-            The Spark return type. For UDF return types, this should always be 
provided
-            and should never be None. If None, pyarrow's inferred type will be 
used
-            (for backward compatibility).
-        arrow_cast : bool, optional
-            Whether to apply Arrow casting when the user-specified return type 
mismatches the
-            actual return values.
-        prefers_large_types : bool, optional
-            Whether to prefer large Arrow types (e.g., large_string instead of 
string).
-
-        Returns
-        -------
-        pyarrow.Array
+        Make ArrowRecordBatches from Pandas Series and serialize.
+        Each element in iterator is:
+        - For batched UDFs: tuple of (series, spark_type) tuples: ((s1, t1), 
(s2, t2), ...)
+        - For iterator UDFs: single (series, spark_type) tuple directly
         """
-        import pyarrow as pa
-        import pandas as pd
 
-        if isinstance(series.dtype, pd.CategoricalDtype):
-            series = series.astype(series.dtypes.categories.dtype)
-
-        # Derive arrow_type from spark_type
-        arrow_type = (
-            to_arrow_type(
-                spark_type, timezone=self._timezone, 
prefers_large_types=prefers_large_types
-            )
-            if spark_type is not None
-            else None
-        )
+        def create_batch(
+            packed: Union[
+                Tuple["pd.Series", DataType],
+                Tuple[Tuple["pd.Series", DataType], ...],

Review Comment:
   Addressed — extracted `_normalize_packed` helper to normalize the input 
upfront. Now `create_batch` always receives a uniform tuple-of-tuples form. 
Changing all the callsites (eval type wrappers in `worker.py`) to always 
produce uniform output is a larger change that would be out of scope for this 
PR. We'll extract those logic out to each eval type in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to