zhengruifeng commented on code in PR #54125:
URL: https://github.com/apache/spark/pull/54125#discussion_r2797524499


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1582,13 +1279,25 @@ def construct_record_batch(pdfs, pdf_data_cnt, 
pdf_schema, state_pdfs, state_dat
             merged_pdf = pd.concat(pdfs, ignore_index=True)
             merged_state_pdf = pd.concat(state_pdfs, ignore_index=True)
 
-            return self._create_batch(
+            # Create batch from list of (DataFrame, spark_type) tuples
+            # Each DataFrame is wrapped as a StructArray
+            data = [count_pdf, merged_pdf, merged_state_pdf]
+            schema = StructType(

Review Comment:
   where is this `schema` from?



##########
python/pyspark/sql/tests/test_conversion.py:
##########
@@ -144,6 +149,199 @@ def test_wrap_struct_empty_batch(self):
         self.assertEqual(wrapped.num_columns, 1)
 
 
[email protected](not have_pyarrow, pyarrow_requirement_message)

Review Comment:
   does it also requires pandas?



##########
python/pyspark/sql/conversion.py:
##########
@@ -162,6 +162,239 @@ def to_pandas(
         ]
 
 
+# TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema
+#       instead of per-column coercion.
+def coerce_arrow_array(
+    arr: "pa.Array",
+    target_type: "pa.DataType",
+    *,
+    safecheck: bool = True,
+    arrow_cast: bool = True,
+) -> "pa.Array":
+    """
+    Coerce an Arrow Array to a target type, with optional type-mismatch 
enforcement.
+
+    When ``arrow_cast`` is True (default), mismatched types are cast to the
+    target type.  When False, a type mismatch raises an error instead.
+
+    Parameters
+    ----------
+    arr : pa.Array
+        Input Arrow array
+    target_type : pa.DataType
+        Target Arrow type
+    safecheck : bool
+        Whether to use safe casting (default True)
+    arrow_cast : bool
+        Whether to allow casting when types don't match (default True)
+
+    Returns
+    -------
+    pa.Array
+    """
+    from pyspark.errors import PySparkTypeError
+
+    if arr.type == target_type:
+        return arr
+
+    if not arrow_cast:
+        raise PySparkTypeError(
+            "Arrow UDFs require the return type to match the expected Arrow 
type. "
+            f"Expected: {target_type}, but got: {arr.type}."
+        )
+
+    # when safe is True, the cast will fail if there's a overflow or other
+    # unsafe conversion.
+    # RecordBatch.cast(...) isn't used as minimum PyArrow version
+    # required for RecordBatch.cast(...) is v16.0
+    return arr.cast(target_type=target_type, safe=safecheck)
+
+
+class PandasToArrowConversion:
+    """
+    Conversion utilities from pandas data to Arrow.
+    """
+
+    @classmethod
+    def convert(
+        cls,
+        data: Union["pd.DataFrame", List[Union["pd.Series", "pd.DataFrame"]]],
+        schema: StructType,
+        *,
+        timezone: Optional[str] = None,
+        safecheck: bool = True,
+        arrow_cast: bool = False,
+        prefers_large_types: bool = False,
+        assign_cols_by_name: bool = False,
+        int_to_decimal_coercion_enabled: bool = False,
+        ignore_unexpected_complex_type_values: bool = False,
+        is_udtf: bool = False,
+    ) -> "pa.RecordBatch":
+        """
+        Convert a pandas DataFrame or list of Series/DataFrames to an Arrow 
RecordBatch.
+
+        Parameters
+        ----------
+        data : pd.DataFrame or list of pd.Series/pd.DataFrame

Review Comment:
   in what case the input is a list of DataFrames?



##########
python/pyspark/sql/conversion.py:
##########
@@ -162,6 +162,239 @@ def to_pandas(
         ]
 
 
+# TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema
+#       instead of per-column coercion.
+def coerce_arrow_array(
+    arr: "pa.Array",
+    target_type: "pa.DataType",
+    *,
+    safecheck: bool = True,
+    arrow_cast: bool = True,
+) -> "pa.Array":
+    """
+    Coerce an Arrow Array to a target type, with optional type-mismatch 
enforcement.
+
+    When ``arrow_cast`` is True (default), mismatched types are cast to the
+    target type.  When False, a type mismatch raises an error instead.
+
+    Parameters
+    ----------
+    arr : pa.Array
+        Input Arrow array
+    target_type : pa.DataType
+        Target Arrow type
+    safecheck : bool
+        Whether to use safe casting (default True)
+    arrow_cast : bool
+        Whether to allow casting when types don't match (default True)
+
+    Returns
+    -------
+    pa.Array
+    """
+    from pyspark.errors import PySparkTypeError
+
+    if arr.type == target_type:
+        return arr
+
+    if not arrow_cast:
+        raise PySparkTypeError(
+            "Arrow UDFs require the return type to match the expected Arrow 
type. "
+            f"Expected: {target_type}, but got: {arr.type}."
+        )
+
+    # when safe is True, the cast will fail if there's a overflow or other
+    # unsafe conversion.
+    # RecordBatch.cast(...) isn't used as minimum PyArrow version
+    # required for RecordBatch.cast(...) is v16.0
+    return arr.cast(target_type=target_type, safe=safecheck)
+
+
+class PandasToArrowConversion:
+    """
+    Conversion utilities from pandas data to Arrow.
+    """
+
+    @classmethod
+    def convert(
+        cls,
+        data: Union["pd.DataFrame", List[Union["pd.Series", "pd.DataFrame"]]],
+        schema: StructType,
+        *,
+        timezone: Optional[str] = None,
+        safecheck: bool = True,
+        arrow_cast: bool = False,
+        prefers_large_types: bool = False,
+        assign_cols_by_name: bool = False,
+        int_to_decimal_coercion_enabled: bool = False,
+        ignore_unexpected_complex_type_values: bool = False,
+        is_udtf: bool = False,
+    ) -> "pa.RecordBatch":
+        """
+        Convert a pandas DataFrame or list of Series/DataFrames to an Arrow 
RecordBatch.
+
+        Parameters
+        ----------
+        data : pd.DataFrame or list of pd.Series/pd.DataFrame
+            Input data - either a DataFrame or a list of Series/DataFrames.
+        schema : StructType
+            Spark schema defining the types for each column
+        timezone : str, optional
+            Timezone for timestamp conversion
+        safecheck : bool
+            Whether to use safe Arrow conversion (default True)
+        arrow_cast : bool
+            Whether to allow Arrow casting on type mismatch (default False)
+        prefers_large_types : bool
+            Whether to prefer large Arrow types (default False)
+        assign_cols_by_name : bool
+            Whether to reorder DataFrame columns by name to match schema 
(default False)
+        int_to_decimal_coercion_enabled : bool
+            Whether to enable int to decimal coercion (default False)
+        ignore_unexpected_complex_type_values : bool
+            Whether to ignore unexpected complex type values in converter 
(default False)
+        is_udtf : bool
+            Whether this conversion is for a UDTF. UDTFs use broader Arrow 
exception
+            handling to allow more type coercions (e.g., struct field casting 
via
+            ArrowTypeError), and convert errors to UDTF_ARROW_TYPE_CAST_ERROR.
+            Regular UDFs only catch ArrowInvalid to preserve legacy behavior 
where
+            e.g. string→decimal must raise an error. (default False)
+
+        Returns
+        -------
+        pa.RecordBatch
+        """
+        import pyarrow as pa
+        import pandas as pd
+
+        from pyspark.errors import PySparkTypeError, PySparkValueError, 
PySparkRuntimeError
+        from pyspark.sql.pandas.types import to_arrow_type, 
_create_converter_from_pandas
+
+        # Handle empty schema (0 columns)
+        # Use dummy column + select([]) to preserve row count (PyArrow 
limitation workaround)
+        if not schema.fields:
+            num_rows = len(data[0]) if isinstance(data, list) and data else 
len(data)
+            return pa.RecordBatch.from_pydict({"_": [None] * 
num_rows}).select([])
+
+        # Handle empty DataFrame (0 columns) with non-empty schema
+        # This happens when user returns pd.DataFrame() for struct types
+        if isinstance(data, pd.DataFrame) and len(data.columns) == 0:
+            arrow_type = to_arrow_type(
+                schema, timezone=timezone, 
prefers_large_types=prefers_large_types
+            )
+            return pa.RecordBatch.from_struct_array(pa.array([{}] * len(data), 
arrow_type))
+
+        # Normalize input: reorder DataFrame columns by schema names if needed,
+        # then extract columns as a list for uniform iteration.
+        if isinstance(data, list):
+            columns = data
+        else:
+            if assign_cols_by_name and any(isinstance(c, str) for c in 
data.columns):
+                data = data[schema.names]
+            columns = [data.iloc[:, i] for i in range(len(schema.fields))]
+
+        def series_to_array(series: "pd.Series", ret_type: DataType, 
field_name: str) -> "pa.Array":
+            """Convert a pandas Series to an Arrow Array (closure over 
conversion params).
+
+            Uses field_name for error messages instead of series.name to avoid
+            copying the Series via rename() — a ~20% overhead on the hot path.
+            """
+            if isinstance(series.dtype, pd.CategoricalDtype):
+                series = series.astype(series.dtype.categories.dtype)
+
+            arrow_type = to_arrow_type(
+                ret_type, timezone=timezone, 
prefers_large_types=prefers_large_types
+            )
+            series = _create_converter_from_pandas(
+                ret_type,
+                timezone=timezone,
+                error_on_duplicated_field_names=False,
+                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                
ignore_unexpected_complex_type_values=ignore_unexpected_complex_type_values,
+            )(series)
+
+            mask = None if hasattr(series.array, "__arrow_array__") else 
series.isnull()
+
+            if is_udtf:

Review Comment:
   shouldn't the `is_udtf` handling be inside `coerce_arrow_array`?



##########
python/pyspark/sql/tests/test_conversion.py:
##########
@@ -144,6 +149,199 @@ def test_wrap_struct_empty_batch(self):
         self.assertEqual(wrapped.num_columns, 1)
 
 
[email protected](not have_pyarrow, pyarrow_requirement_message)
+class PandasToArrowConversionTests(unittest.TestCase):
+    def test_convert(self):
+        """Test basic DataFrame/Series to Arrow RecordBatch conversion."""
+        import pandas as pd
+        import pyarrow as pa
+
+        # Basic DataFrame conversion
+        df = pd.DataFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
+        schema = StructType([StructField("a", IntegerType()), StructField("b", 
DoubleType())])
+        result = PandasToArrowConversion.convert(df, schema)

Review Comment:
   I see you add a batch of tests here, is the `convert` method the final 
version?



##########
python/pyspark/sql/conversion.py:
##########
@@ -162,6 +162,239 @@ def to_pandas(
         ]
 
 
+# TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema
+#       instead of per-column coercion.
+def coerce_arrow_array(
+    arr: "pa.Array",
+    target_type: "pa.DataType",
+    *,
+    safecheck: bool = True,
+    arrow_cast: bool = True,
+) -> "pa.Array":
+    """
+    Coerce an Arrow Array to a target type, with optional type-mismatch 
enforcement.
+
+    When ``arrow_cast`` is True (default), mismatched types are cast to the
+    target type.  When False, a type mismatch raises an error instead.
+
+    Parameters
+    ----------
+    arr : pa.Array
+        Input Arrow array
+    target_type : pa.DataType
+        Target Arrow type
+    safecheck : bool
+        Whether to use safe casting (default True)
+    arrow_cast : bool
+        Whether to allow casting when types don't match (default True)
+
+    Returns
+    -------
+    pa.Array
+    """
+    from pyspark.errors import PySparkTypeError
+
+    if arr.type == target_type:
+        return arr
+
+    if not arrow_cast:
+        raise PySparkTypeError(
+            "Arrow UDFs require the return type to match the expected Arrow 
type. "
+            f"Expected: {target_type}, but got: {arr.type}."
+        )
+
+    # when safe is True, the cast will fail if there's a overflow or other
+    # unsafe conversion.
+    # RecordBatch.cast(...) isn't used as minimum PyArrow version
+    # required for RecordBatch.cast(...) is v16.0
+    return arr.cast(target_type=target_type, safe=safecheck)
+
+
+class PandasToArrowConversion:
+    """
+    Conversion utilities from pandas data to Arrow.
+    """
+
+    @classmethod
+    def convert(
+        cls,
+        data: Union["pd.DataFrame", List[Union["pd.Series", "pd.DataFrame"]]],
+        schema: StructType,
+        *,
+        timezone: Optional[str] = None,
+        safecheck: bool = True,
+        arrow_cast: bool = False,
+        prefers_large_types: bool = False,
+        assign_cols_by_name: bool = False,
+        int_to_decimal_coercion_enabled: bool = False,
+        ignore_unexpected_complex_type_values: bool = False,
+        is_udtf: bool = False,
+    ) -> "pa.RecordBatch":
+        """
+        Convert a pandas DataFrame or list of Series/DataFrames to an Arrow 
RecordBatch.
+
+        Parameters
+        ----------
+        data : pd.DataFrame or list of pd.Series/pd.DataFrame
+            Input data - either a DataFrame or a list of Series/DataFrames.
+        schema : StructType
+            Spark schema defining the types for each column
+        timezone : str, optional
+            Timezone for timestamp conversion
+        safecheck : bool
+            Whether to use safe Arrow conversion (default True)
+        arrow_cast : bool
+            Whether to allow Arrow casting on type mismatch (default False)
+        prefers_large_types : bool
+            Whether to prefer large Arrow types (default False)
+        assign_cols_by_name : bool
+            Whether to reorder DataFrame columns by name to match schema 
(default False)
+        int_to_decimal_coercion_enabled : bool
+            Whether to enable int to decimal coercion (default False)
+        ignore_unexpected_complex_type_values : bool
+            Whether to ignore unexpected complex type values in converter 
(default False)
+        is_udtf : bool
+            Whether this conversion is for a UDTF. UDTFs use broader Arrow 
exception
+            handling to allow more type coercions (e.g., struct field casting 
via
+            ArrowTypeError), and convert errors to UDTF_ARROW_TYPE_CAST_ERROR.
+            Regular UDFs only catch ArrowInvalid to preserve legacy behavior 
where
+            e.g. string→decimal must raise an error. (default False)
+
+        Returns
+        -------
+        pa.RecordBatch
+        """
+        import pyarrow as pa
+        import pandas as pd
+
+        from pyspark.errors import PySparkTypeError, PySparkValueError, 
PySparkRuntimeError
+        from pyspark.sql.pandas.types import to_arrow_type, 
_create_converter_from_pandas
+
+        # Handle empty schema (0 columns)
+        # Use dummy column + select([]) to preserve row count (PyArrow 
limitation workaround)
+        if not schema.fields:

Review Comment:
   what does `not schema.fields` means? `fields` is None?



##########
python/pyspark/sql/conversion.py:
##########
@@ -162,6 +162,239 @@ def to_pandas(
         ]
 
 
+# TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema
+#       instead of per-column coercion.
+def coerce_arrow_array(
+    arr: "pa.Array",
+    target_type: "pa.DataType",
+    *,
+    safecheck: bool = True,
+    arrow_cast: bool = True,
+) -> "pa.Array":
+    """
+    Coerce an Arrow Array to a target type, with optional type-mismatch 
enforcement.
+
+    When ``arrow_cast`` is True (default), mismatched types are cast to the
+    target type.  When False, a type mismatch raises an error instead.
+
+    Parameters
+    ----------
+    arr : pa.Array
+        Input Arrow array
+    target_type : pa.DataType
+        Target Arrow type
+    safecheck : bool
+        Whether to use safe casting (default True)
+    arrow_cast : bool
+        Whether to allow casting when types don't match (default True)
+
+    Returns
+    -------
+    pa.Array
+    """
+    from pyspark.errors import PySparkTypeError
+
+    if arr.type == target_type:
+        return arr
+
+    if not arrow_cast:
+        raise PySparkTypeError(
+            "Arrow UDFs require the return type to match the expected Arrow 
type. "
+            f"Expected: {target_type}, but got: {arr.type}."
+        )
+
+    # when safe is True, the cast will fail if there's a overflow or other
+    # unsafe conversion.
+    # RecordBatch.cast(...) isn't used as minimum PyArrow version
+    # required for RecordBatch.cast(...) is v16.0
+    return arr.cast(target_type=target_type, safe=safecheck)
+
+
+class PandasToArrowConversion:
+    """
+    Conversion utilities from pandas data to Arrow.
+    """
+
+    @classmethod
+    def convert(
+        cls,
+        data: Union["pd.DataFrame", List[Union["pd.Series", "pd.DataFrame"]]],
+        schema: StructType,
+        *,
+        timezone: Optional[str] = None,
+        safecheck: bool = True,
+        arrow_cast: bool = False,
+        prefers_large_types: bool = False,
+        assign_cols_by_name: bool = False,
+        int_to_decimal_coercion_enabled: bool = False,
+        ignore_unexpected_complex_type_values: bool = False,
+        is_udtf: bool = False,
+    ) -> "pa.RecordBatch":
+        """
+        Convert a pandas DataFrame or list of Series/DataFrames to an Arrow 
RecordBatch.
+
+        Parameters
+        ----------
+        data : pd.DataFrame or list of pd.Series/pd.DataFrame
+            Input data - either a DataFrame or a list of Series/DataFrames.
+        schema : StructType
+            Spark schema defining the types for each column
+        timezone : str, optional
+            Timezone for timestamp conversion
+        safecheck : bool
+            Whether to use safe Arrow conversion (default True)
+        arrow_cast : bool
+            Whether to allow Arrow casting on type mismatch (default False)
+        prefers_large_types : bool
+            Whether to prefer large Arrow types (default False)
+        assign_cols_by_name : bool
+            Whether to reorder DataFrame columns by name to match schema 
(default False)
+        int_to_decimal_coercion_enabled : bool
+            Whether to enable int to decimal coercion (default False)
+        ignore_unexpected_complex_type_values : bool
+            Whether to ignore unexpected complex type values in converter 
(default False)
+        is_udtf : bool

Review Comment:
   please add a TODO with JIRA to unify it in the future



##########
python/pyspark/sql/conversion.py:
##########
@@ -162,6 +162,239 @@ def to_pandas(
         ]
 
 
+# TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema
+#       instead of per-column coercion.
+def coerce_arrow_array(

Review Comment:
   we have a `ArrowArrayConversion`, should this function in it?



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -439,122 +430,52 @@ def __init__(
         self._input_type = input_type
         self._arrow_cast = arrow_cast
 
-    def _create_array(self, series, spark_type, *, arrow_cast=False, 
prefers_large_types=False):
+    def dump_stream(self, iterator, stream):
         """
-        Create an Arrow Array from the given pandas.Series and Spark type.
-
-        Parameters
-        ----------
-        series : pandas.Series
-            A single series
-        spark_type : DataType, optional
-            The Spark return type. For UDF return types, this should always be 
provided
-            and should never be None. If None, pyarrow's inferred type will be 
used
-            (for backward compatibility).
-        arrow_cast : bool, optional
-            Whether to apply Arrow casting when the user-specified return type 
mismatches the
-            actual return values.
-        prefers_large_types : bool, optional
-            Whether to prefer large Arrow types (e.g., large_string instead of 
string).
-
-        Returns
-        -------
-        pyarrow.Array
+        Make ArrowRecordBatches from Pandas Series and serialize.
+        Each element in iterator is:
+        - For batched UDFs: tuple of (series, spark_type) tuples: ((s1, t1), 
(s2, t2), ...)
+        - For iterator UDFs: single (series, spark_type) tuple directly
         """
-        import pyarrow as pa
-        import pandas as pd
 
-        if isinstance(series.dtype, pd.CategoricalDtype):
-            series = series.astype(series.dtypes.categories.dtype)
-
-        # Derive arrow_type from spark_type
-        arrow_type = (
-            to_arrow_type(
-                spark_type, timezone=self._timezone, 
prefers_large_types=prefers_large_types
-            )
-            if spark_type is not None
-            else None
-        )
+        def create_batch(
+            packed: Union[
+                Tuple["pd.Series", DataType],
+                Tuple[Tuple["pd.Series", DataType], ...],

Review Comment:
   Can we clear define the input type and avoid the usage of `Union` here?



##########
python/pyspark/sql/conversion.py:
##########
@@ -162,6 +162,239 @@ def to_pandas(
         ]
 
 
+# TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema
+#       instead of per-column coercion.
+def coerce_arrow_array(
+    arr: "pa.Array",
+    target_type: "pa.DataType",
+    *,
+    safecheck: bool = True,
+    arrow_cast: bool = True,
+) -> "pa.Array":
+    """
+    Coerce an Arrow Array to a target type, with optional type-mismatch 
enforcement.
+
+    When ``arrow_cast`` is True (default), mismatched types are cast to the
+    target type.  When False, a type mismatch raises an error instead.
+
+    Parameters
+    ----------
+    arr : pa.Array
+        Input Arrow array
+    target_type : pa.DataType
+        Target Arrow type
+    safecheck : bool
+        Whether to use safe casting (default True)
+    arrow_cast : bool
+        Whether to allow casting when types don't match (default True)
+
+    Returns
+    -------
+    pa.Array
+    """
+    from pyspark.errors import PySparkTypeError
+
+    if arr.type == target_type:
+        return arr
+
+    if not arrow_cast:
+        raise PySparkTypeError(
+            "Arrow UDFs require the return type to match the expected Arrow 
type. "
+            f"Expected: {target_type}, but got: {arr.type}."
+        )
+
+    # when safe is True, the cast will fail if there's a overflow or other
+    # unsafe conversion.
+    # RecordBatch.cast(...) isn't used as minimum PyArrow version
+    # required for RecordBatch.cast(...) is v16.0
+    return arr.cast(target_type=target_type, safe=safecheck)
+
+
+class PandasToArrowConversion:
+    """
+    Conversion utilities from pandas data to Arrow.
+    """
+
+    @classmethod
+    def convert(
+        cls,
+        data: Union["pd.DataFrame", List[Union["pd.Series", "pd.DataFrame"]]],
+        schema: StructType,
+        *,
+        timezone: Optional[str] = None,
+        safecheck: bool = True,
+        arrow_cast: bool = False,
+        prefers_large_types: bool = False,
+        assign_cols_by_name: bool = False,
+        int_to_decimal_coercion_enabled: bool = False,
+        ignore_unexpected_complex_type_values: bool = False,
+        is_udtf: bool = False,
+    ) -> "pa.RecordBatch":
+        """
+        Convert a pandas DataFrame or list of Series/DataFrames to an Arrow 
RecordBatch.
+
+        Parameters
+        ----------
+        data : pd.DataFrame or list of pd.Series/pd.DataFrame
+            Input data - either a DataFrame or a list of Series/DataFrames.
+        schema : StructType
+            Spark schema defining the types for each column
+        timezone : str, optional
+            Timezone for timestamp conversion
+        safecheck : bool
+            Whether to use safe Arrow conversion (default True)
+        arrow_cast : bool
+            Whether to allow Arrow casting on type mismatch (default False)
+        prefers_large_types : bool
+            Whether to prefer large Arrow types (default False)
+        assign_cols_by_name : bool
+            Whether to reorder DataFrame columns by name to match schema 
(default False)
+        int_to_decimal_coercion_enabled : bool
+            Whether to enable int to decimal coercion (default False)
+        ignore_unexpected_complex_type_values : bool
+            Whether to ignore unexpected complex type values in converter 
(default False)
+        is_udtf : bool
+            Whether this conversion is for a UDTF. UDTFs use broader Arrow 
exception
+            handling to allow more type coercions (e.g., struct field casting 
via
+            ArrowTypeError), and convert errors to UDTF_ARROW_TYPE_CAST_ERROR.
+            Regular UDFs only catch ArrowInvalid to preserve legacy behavior 
where
+            e.g. string→decimal must raise an error. (default False)
+
+        Returns
+        -------
+        pa.RecordBatch
+        """
+        import pyarrow as pa
+        import pandas as pd
+
+        from pyspark.errors import PySparkTypeError, PySparkValueError, 
PySparkRuntimeError
+        from pyspark.sql.pandas.types import to_arrow_type, 
_create_converter_from_pandas
+
+        # Handle empty schema (0 columns)
+        # Use dummy column + select([]) to preserve row count (PyArrow 
limitation workaround)
+        if not schema.fields:
+            num_rows = len(data[0]) if isinstance(data, list) and data else 
len(data)
+            return pa.RecordBatch.from_pydict({"_": [None] * 
num_rows}).select([])
+
+        # Handle empty DataFrame (0 columns) with non-empty schema
+        # This happens when user returns pd.DataFrame() for struct types
+        if isinstance(data, pd.DataFrame) and len(data.columns) == 0:
+            arrow_type = to_arrow_type(
+                schema, timezone=timezone, 
prefers_large_types=prefers_large_types
+            )
+            return pa.RecordBatch.from_struct_array(pa.array([{}] * len(data), 
arrow_type))
+
+        # Normalize input: reorder DataFrame columns by schema names if needed,
+        # then extract columns as a list for uniform iteration.
+        if isinstance(data, list):
+            columns = data
+        else:
+            if assign_cols_by_name and any(isinstance(c, str) for c in 
data.columns):
+                data = data[schema.names]
+            columns = [data.iloc[:, i] for i in range(len(schema.fields))]
+
+        def series_to_array(series: "pd.Series", ret_type: DataType, 
field_name: str) -> "pa.Array":
+            """Convert a pandas Series to an Arrow Array (closure over 
conversion params).
+
+            Uses field_name for error messages instead of series.name to avoid
+            copying the Series via rename() — a ~20% overhead on the hot path.
+            """
+            if isinstance(series.dtype, pd.CategoricalDtype):
+                series = series.astype(series.dtype.categories.dtype)
+
+            arrow_type = to_arrow_type(
+                ret_type, timezone=timezone, 
prefers_large_types=prefers_large_types
+            )
+            series = _create_converter_from_pandas(
+                ret_type,
+                timezone=timezone,
+                error_on_duplicated_field_names=False,
+                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                
ignore_unexpected_complex_type_values=ignore_unexpected_complex_type_values,
+            )(series)
+
+            mask = None if hasattr(series.array, "__arrow_array__") else 
series.isnull()
+
+            if is_udtf:
+                # UDTF path: broad ArrowException catch so that both 
ArrowInvalid
+                # AND ArrowTypeError (e.g. dict→struct) trigger the cast 
fallback.
+                try:
+                    try:
+                        return pa.Array.from_pandas(
+                            series, mask=mask, type=arrow_type, safe=safecheck
+                        )
+                    except pa.lib.ArrowException:  # broad: includes 
ArrowTypeError
+                        if arrow_cast:
+                            return pa.Array.from_pandas(series, 
mask=mask).cast(
+                                target_type=arrow_type, safe=safecheck
+                            )
+                        raise
+                except pa.lib.ArrowException:  # convert any Arrow error to 
user-friendly message
+                    raise PySparkRuntimeError(
+                        errorClass="UDTF_ARROW_TYPE_CAST_ERROR",
+                        messageParameters={
+                            "col_name": field_name,
+                            "col_type": str(series.dtype),
+                            "arrow_type": str(arrow_type),
+                        },
+                    ) from None
+            else:
+                # UDF path: only ArrowInvalid triggers the cast fallback.
+                # ArrowTypeError (e.g. string→decimal) must NOT be silently 
cast.
+                try:
+                    try:
+                        return pa.Array.from_pandas(
+                            series, mask=mask, type=arrow_type, safe=safecheck
+                        )
+                    except pa.lib.ArrowInvalid:  # narrow: skip ArrowTypeError
+                        if arrow_cast:
+                            return pa.Array.from_pandas(series, 
mask=mask).cast(
+                                target_type=arrow_type, safe=safecheck
+                            )
+                        raise
+                except TypeError as e:  # includes pa.lib.ArrowTypeError
+                    raise PySparkTypeError(
+                        f"Exception thrown when converting pandas.Series 
({series.dtype}) "
+                        f"with name '{field_name}' to Arrow Array 
({arrow_type})."
+                    ) from e
+                except ValueError as e:  # includes pa.lib.ArrowInvalid
+                    error_msg = (
+                        f"Exception thrown when converting pandas.Series 
({series.dtype}) "
+                        f"with name '{field_name}' to Arrow Array 
({arrow_type})."
+                    )
+                    if safecheck:
+                        error_msg += (
+                            " It can be caused by overflows or other unsafe 
conversions "
+                            "warned by Arrow. Arrow safe type check can be 
disabled by using "
+                            "SQL config 
`spark.sql.execution.pandas.convertToArrowArraySafely`."
+                        )
+                    raise PySparkValueError(error_msg) from e
+
+        def convert_column(
+            col: Union["pd.Series", "pd.DataFrame"], field: StructField
+        ) -> "pa.Array":
+            """Convert a single column (Series or DataFrame) to an Arrow 
Array."""
+            if isinstance(col, pd.DataFrame):
+                assert isinstance(field.dataType, StructType)
+                nested_batch = cls.convert(
+                    col,
+                    field.dataType,
+                    timezone=timezone,
+                    safecheck=safecheck,
+                    arrow_cast=arrow_cast,
+                    prefers_large_types=prefers_large_types,
+                    assign_cols_by_name=assign_cols_by_name,
+                    
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                    
ignore_unexpected_complex_type_values=ignore_unexpected_complex_type_values,
+                    is_udtf=is_udtf,
+                )
+                return 
ArrowBatchTransformer.wrap_struct(nested_batch).column(0)

Review Comment:
   this line really takes me some seconds to remember what it does



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -816,17 +626,24 @@ def dump_stream(self, iterator, stream):
         """
         import pyarrow as pa
 
-        def create_batches():
-            for packed in iterator:
-                if len(packed) == 2 and isinstance(packed[1], pa.DataType):
-                    # single array UDF in a projection
-                    arrs = [self._create_array(packed[0], packed[1], 
self._arrow_cast)]
-                else:
-                    # multiple array UDFs in a projection
-                    arrs = [self._create_array(t[0], t[1], self._arrow_cast) 
for t in packed]
-                yield pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+        def create_batch(
+            packed: Union[
+                Tuple["pa.Array", "pa.DataType"],
+                List[Tuple["pa.Array", "pa.DataType"]],

Review Comment:
   can we unify the input type to always match the multiple UDF cases?



##########
python/pyspark/sql/conversion.py:
##########
@@ -162,6 +162,239 @@ def to_pandas(
         ]
 
 
+# TODO: elevate to ArrowBatchTransformer and operate on full RecordBatch schema
+#       instead of per-column coercion.
+def coerce_arrow_array(
+    arr: "pa.Array",
+    target_type: "pa.DataType",
+    *,
+    safecheck: bool = True,
+    arrow_cast: bool = True,
+) -> "pa.Array":
+    """
+    Coerce an Arrow Array to a target type, with optional type-mismatch 
enforcement.
+
+    When ``arrow_cast`` is True (default), mismatched types are cast to the
+    target type.  When False, a type mismatch raises an error instead.
+
+    Parameters
+    ----------
+    arr : pa.Array
+        Input Arrow array
+    target_type : pa.DataType
+        Target Arrow type
+    safecheck : bool
+        Whether to use safe casting (default True)
+    arrow_cast : bool
+        Whether to allow casting when types don't match (default True)
+
+    Returns
+    -------
+    pa.Array
+    """
+    from pyspark.errors import PySparkTypeError
+
+    if arr.type == target_type:
+        return arr
+
+    if not arrow_cast:
+        raise PySparkTypeError(
+            "Arrow UDFs require the return type to match the expected Arrow 
type. "
+            f"Expected: {target_type}, but got: {arr.type}."
+        )
+
+    # when safe is True, the cast will fail if there's a overflow or other
+    # unsafe conversion.
+    # RecordBatch.cast(...) isn't used as minimum PyArrow version
+    # required for RecordBatch.cast(...) is v16.0
+    return arr.cast(target_type=target_type, safe=safecheck)
+
+
+class PandasToArrowConversion:
+    """
+    Conversion utilities from pandas data to Arrow.
+    """
+
+    @classmethod
+    def convert(
+        cls,
+        data: Union["pd.DataFrame", List[Union["pd.Series", "pd.DataFrame"]]],
+        schema: StructType,
+        *,
+        timezone: Optional[str] = None,
+        safecheck: bool = True,
+        arrow_cast: bool = False,
+        prefers_large_types: bool = False,
+        assign_cols_by_name: bool = False,
+        int_to_decimal_coercion_enabled: bool = False,
+        ignore_unexpected_complex_type_values: bool = False,
+        is_udtf: bool = False,
+    ) -> "pa.RecordBatch":
+        """
+        Convert a pandas DataFrame or list of Series/DataFrames to an Arrow 
RecordBatch.
+
+        Parameters
+        ----------
+        data : pd.DataFrame or list of pd.Series/pd.DataFrame
+            Input data - either a DataFrame or a list of Series/DataFrames.
+        schema : StructType
+            Spark schema defining the types for each column
+        timezone : str, optional
+            Timezone for timestamp conversion
+        safecheck : bool
+            Whether to use safe Arrow conversion (default True)
+        arrow_cast : bool
+            Whether to allow Arrow casting on type mismatch (default False)
+        prefers_large_types : bool
+            Whether to prefer large Arrow types (default False)
+        assign_cols_by_name : bool
+            Whether to reorder DataFrame columns by name to match schema 
(default False)
+        int_to_decimal_coercion_enabled : bool
+            Whether to enable int to decimal coercion (default False)
+        ignore_unexpected_complex_type_values : bool
+            Whether to ignore unexpected complex type values in converter 
(default False)
+        is_udtf : bool
+            Whether this conversion is for a UDTF. UDTFs use broader Arrow 
exception
+            handling to allow more type coercions (e.g., struct field casting 
via
+            ArrowTypeError), and convert errors to UDTF_ARROW_TYPE_CAST_ERROR.
+            Regular UDFs only catch ArrowInvalid to preserve legacy behavior 
where
+            e.g. string→decimal must raise an error. (default False)
+
+        Returns
+        -------
+        pa.RecordBatch
+        """
+        import pyarrow as pa
+        import pandas as pd
+
+        from pyspark.errors import PySparkTypeError, PySparkValueError, 
PySparkRuntimeError
+        from pyspark.sql.pandas.types import to_arrow_type, 
_create_converter_from_pandas
+
+        # Handle empty schema (0 columns)
+        # Use dummy column + select([]) to preserve row count (PyArrow 
limitation workaround)
+        if not schema.fields:
+            num_rows = len(data[0]) if isinstance(data, list) and data else 
len(data)
+            return pa.RecordBatch.from_pydict({"_": [None] * 
num_rows}).select([])
+
+        # Handle empty DataFrame (0 columns) with non-empty schema
+        # This happens when user returns pd.DataFrame() for struct types
+        if isinstance(data, pd.DataFrame) and len(data.columns) == 0:
+            arrow_type = to_arrow_type(
+                schema, timezone=timezone, 
prefers_large_types=prefers_large_types
+            )
+            return pa.RecordBatch.from_struct_array(pa.array([{}] * len(data), 
arrow_type))
+
+        # Normalize input: reorder DataFrame columns by schema names if needed,
+        # then extract columns as a list for uniform iteration.
+        if isinstance(data, list):
+            columns = data
+        else:
+            if assign_cols_by_name and any(isinstance(c, str) for c in 
data.columns):
+                data = data[schema.names]
+            columns = [data.iloc[:, i] for i in range(len(schema.fields))]
+
+        def series_to_array(series: "pd.Series", ret_type: DataType, 
field_name: str) -> "pa.Array":
+            """Convert a pandas Series to an Arrow Array (closure over 
conversion params).
+
+            Uses field_name for error messages instead of series.name to avoid
+            copying the Series via rename() — a ~20% overhead on the hot path.
+            """
+            if isinstance(series.dtype, pd.CategoricalDtype):
+                series = series.astype(series.dtype.categories.dtype)
+
+            arrow_type = to_arrow_type(
+                ret_type, timezone=timezone, 
prefers_large_types=prefers_large_types
+            )
+            series = _create_converter_from_pandas(
+                ret_type,
+                timezone=timezone,
+                error_on_duplicated_field_names=False,
+                
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+                
ignore_unexpected_complex_type_values=ignore_unexpected_complex_type_values,
+            )(series)
+
+            mask = None if hasattr(series.array, "__arrow_array__") else 
series.isnull()
+
+            if is_udtf:
+                # UDTF path: broad ArrowException catch so that both 
ArrowInvalid
+                # AND ArrowTypeError (e.g. dict→struct) trigger the cast 
fallback.
+                try:
+                    try:
+                        return pa.Array.from_pandas(
+                            series, mask=mask, type=arrow_type, safe=safecheck
+                        )
+                    except pa.lib.ArrowException:  # broad: includes 
ArrowTypeError
+                        if arrow_cast:
+                            return pa.Array.from_pandas(series, 
mask=mask).cast(
+                                target_type=arrow_type, safe=safecheck
+                            )
+                        raise
+                except pa.lib.ArrowException:  # convert any Arrow error to 
user-friendly message
+                    raise PySparkRuntimeError(
+                        errorClass="UDTF_ARROW_TYPE_CAST_ERROR",
+                        messageParameters={
+                            "col_name": field_name,
+                            "col_type": str(series.dtype),
+                            "arrow_type": str(arrow_type),
+                        },
+                    ) from None
+            else:
+                # UDF path: only ArrowInvalid triggers the cast fallback.
+                # ArrowTypeError (e.g. string→decimal) must NOT be silently 
cast.
+                try:
+                    try:
+                        return pa.Array.from_pandas(
+                            series, mask=mask, type=arrow_type, safe=safecheck
+                        )
+                    except pa.lib.ArrowInvalid:  # narrow: skip ArrowTypeError
+                        if arrow_cast:
+                            return pa.Array.from_pandas(series, 
mask=mask).cast(
+                                target_type=arrow_type, safe=safecheck
+                            )
+                        raise
+                except TypeError as e:  # includes pa.lib.ArrowTypeError
+                    raise PySparkTypeError(
+                        f"Exception thrown when converting pandas.Series 
({series.dtype}) "
+                        f"with name '{field_name}' to Arrow Array 
({arrow_type})."
+                    ) from e
+                except ValueError as e:  # includes pa.lib.ArrowInvalid
+                    error_msg = (
+                        f"Exception thrown when converting pandas.Series 
({series.dtype}) "
+                        f"with name '{field_name}' to Arrow Array 
({arrow_type})."
+                    )
+                    if safecheck:
+                        error_msg += (
+                            " It can be caused by overflows or other unsafe 
conversions "
+                            "warned by Arrow. Arrow safe type check can be 
disabled by using "
+                            "SQL config 
`spark.sql.execution.pandas.convertToArrowArraySafely`."
+                        )
+                    raise PySparkValueError(error_msg) from e
+
+        def convert_column(

Review Comment:
   can we consolidate `convert_column` and `series_to_array`?



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -439,122 +430,52 @@ def __init__(
         self._input_type = input_type
         self._arrow_cast = arrow_cast
 
-    def _create_array(self, series, spark_type, *, arrow_cast=False, 
prefers_large_types=False):
+    def dump_stream(self, iterator, stream):
         """
-        Create an Arrow Array from the given pandas.Series and Spark type.
-
-        Parameters
-        ----------
-        series : pandas.Series
-            A single series
-        spark_type : DataType, optional
-            The Spark return type. For UDF return types, this should always be 
provided
-            and should never be None. If None, pyarrow's inferred type will be 
used
-            (for backward compatibility).
-        arrow_cast : bool, optional
-            Whether to apply Arrow casting when the user-specified return type 
mismatches the
-            actual return values.
-        prefers_large_types : bool, optional
-            Whether to prefer large Arrow types (e.g., large_string instead of 
string).
-
-        Returns
-        -------
-        pyarrow.Array
+        Make ArrowRecordBatches from Pandas Series and serialize.
+        Each element in iterator is:
+        - For batched UDFs: tuple of (series, spark_type) tuples: ((s1, t1), 
(s2, t2), ...)
+        - For iterator UDFs: single (series, spark_type) tuple directly
         """
-        import pyarrow as pa
-        import pandas as pd
 
-        if isinstance(series.dtype, pd.CategoricalDtype):
-            series = series.astype(series.dtypes.categories.dtype)
-
-        # Derive arrow_type from spark_type
-        arrow_type = (
-            to_arrow_type(
-                spark_type, timezone=self._timezone, 
prefers_large_types=prefers_large_types
-            )
-            if spark_type is not None
-            else None
-        )
+        def create_batch(
+            packed: Union[
+                Tuple["pd.Series", DataType],
+                Tuple[Tuple["pd.Series", DataType], ...],

Review Comment:
   The mixture of input types is also the source of confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to