zhengruifeng commented on code in PR #54125:
URL: https://github.com/apache/spark/pull/54125#discussion_r2761916242
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -453,121 +432,26 @@ def arrow_to_pandas(
ndarray_as_list=ndarray_as_list,
)
- def _create_array(self, series, spark_type, *, arrow_cast=False,
prefers_large_types=False):
+ def dump_stream(self, iterator, stream):
"""
- Create an Arrow Array from the given pandas.Series and Spark type.
-
- Parameters
- ----------
- series : pandas.Series
- A single series
- spark_type : DataType, optional
- The Spark return type. For UDF return types, this should always be
provided
- and should never be None. If None, pyarrow's inferred type will be
used
- (for backward compatibility).
- arrow_cast : bool, optional
- Whether to apply Arrow casting when the user-specified return type
mismatches the
- actual return values.
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
-
- Returns
- -------
- pyarrow.Array
+ Make ArrowRecordBatches from Pandas Series and serialize.
+ Each element in iterator is an iterable of (series, spark_type) tuples.
"""
- import pyarrow as pa
- import pandas as pd
-
- if isinstance(series.dtype, pd.CategoricalDtype):
- series = series.astype(series.dtypes.categories.dtype)
-
- # Derive arrow_type from spark_type
- arrow_type = (
- to_arrow_type(
- spark_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
- )
- if spark_type is not None
- else None
- )
+ from pyspark.sql.types import StructType, StructField
Review Comment:
I think such import should be placed at the top of the file.
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -631,170 +517,78 @@ def __init__(
arrow_cast,
)
self._assign_cols_by_name = assign_cols_by_name
+ self._ignore_unexpected_complex_type_values =
ignore_unexpected_complex_type_values
+ self._error_class = error_class
- def _create_struct_array(
- self,
- df: "pd.DataFrame",
- return_type: StructType,
- *,
- prefers_large_types: bool = False,
- ):
- """
- Create an Arrow StructArray from the given pandas.DataFrame and Spark
StructType.
-
- Parameters
- ----------
- df : pandas.DataFrame
- A pandas DataFrame
- return_type : StructType
- The Spark return type (StructType) to use
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
-
- Returns
- -------
- pyarrow.Array
- """
- import pyarrow as pa
-
- # Derive arrow_struct_type from return_type
- arrow_struct_type = to_arrow_type(
- return_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
- )
-
- if len(df.columns) == 0:
- return pa.array([{}] * len(df), arrow_struct_type)
- # Assign result columns by schema name if user labeled with strings
- if self._assign_cols_by_name and any(isinstance(name, str) for name in
df.columns):
- struct_arrs = [
- self._create_array(
- df[spark_field.name],
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for spark_field in return_type
- ]
- # Assign result columns by position
- else:
- struct_arrs = [
- # the selected series has name '1', so we rename it to
spark_field.name
- # as the name is used by _create_array to provide a meaningful
error message
- self._create_array(
- df[df.columns[i]].rename(spark_field.name),
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for i, spark_field in enumerate(return_type)
- ]
-
- return pa.StructArray.from_arrays(struct_arrs,
fields=list(arrow_struct_type))
-
- def _create_batch(
- self, series, *, arrow_cast=False, prefers_large_types=False,
struct_in_pandas="dict"
- ):
+ def dump_stream(self, iterator, stream):
"""
- Create an Arrow record batch from the given pandas.Series,
pandas.DataFrame,
- or list of Series/DataFrame, with optional Spark type.
-
- Parameters
- ----------
- series : pandas.Series or pandas.DataFrame or list
- A single series or dataframe, list of series or dataframe,
- or list of (series or dataframe, spark_type) tuples.
- arrow_cast : bool, optional
- If True, use Arrow's cast method for type conversion.
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
- struct_in_pandas : str, optional
- How to represent struct types in pandas: "dict" or "row".
- Default is "dict".
-
- Returns
- -------
- pyarrow.RecordBatch
- Arrow RecordBatch
+ Override because Pandas UDFs require a START_ARROW_STREAM before the
Arrow stream is sent.
+ This should be sent after creating the first record batch so in case
of an error, it can
+ be sent back to the JVM before the Arrow stream starts.
"""
import pandas as pd
import pyarrow as pa
- # Normalize input to list of (data, spark_type) tuples
- # Handle: single series, (series, type) tuple, or list of tuples
- if not isinstance(series, (list, tuple)) or (
- len(series) == 2 and isinstance(series[1], DataType)
- ):
- series = [series]
- # Ensure each element is a (data, spark_type) tuple
- series = [(s, None) if not isinstance(s, (list, tuple)) else s for s
in series]
-
- arrs = []
- for s, spark_type in series:
- # Convert spark_type to arrow_type for type checking (similar to
master branch)
- arrow_type = (
- to_arrow_type(
- spark_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
+ def create_batch(series_with_types):
Review Comment:
add type hint for new functions?
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -319,11 +294,15 @@ def apply_type_coercion():
f"Expected: {expected_field_names}, but got:
{actual_field_names}."
)
- coerced_arrays = []
- for i, field in enumerate(arrow_return_type):
- original_array = batch.column(i)
- coerced_array = self._create_array(original_array,
field.type)
- coerced_arrays.append(coerced_array)
+ coerced_arrays = [
+ ArrowBatchTransformer.cast_array(
Review Comment:
should `cast_array` be in `ArrowBatchTransformer`?
is it for batch?
##########
python/pyspark/sql/tests/test_conversion.py:
##########
@@ -144,6 +145,117 @@ def test_wrap_struct_empty_batch(self):
self.assertEqual(wrapped.num_columns, 1)
[email protected](not have_pyarrow, pyarrow_requirement_message)
+class PandasToArrowConversionTests(unittest.TestCase):
+ def test_dataframe_to_batch(self):
+ """Test basic DataFrame/Series to Arrow RecordBatch conversion."""
+ import pandas as pd
+ import pyarrow as pa
+
+ from pyspark.sql.types import IntegerType, DoubleType, StructType,
StructField
Review Comment:
such import should be at the top of the file
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -631,170 +517,78 @@ def __init__(
arrow_cast,
)
self._assign_cols_by_name = assign_cols_by_name
+ self._ignore_unexpected_complex_type_values =
ignore_unexpected_complex_type_values
+ self._error_class = error_class
- def _create_struct_array(
- self,
- df: "pd.DataFrame",
- return_type: StructType,
- *,
- prefers_large_types: bool = False,
- ):
- """
- Create an Arrow StructArray from the given pandas.DataFrame and Spark
StructType.
-
- Parameters
- ----------
- df : pandas.DataFrame
- A pandas DataFrame
- return_type : StructType
- The Spark return type (StructType) to use
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
-
- Returns
- -------
- pyarrow.Array
- """
- import pyarrow as pa
-
- # Derive arrow_struct_type from return_type
- arrow_struct_type = to_arrow_type(
- return_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
- )
-
- if len(df.columns) == 0:
- return pa.array([{}] * len(df), arrow_struct_type)
- # Assign result columns by schema name if user labeled with strings
- if self._assign_cols_by_name and any(isinstance(name, str) for name in
df.columns):
- struct_arrs = [
- self._create_array(
- df[spark_field.name],
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for spark_field in return_type
- ]
- # Assign result columns by position
- else:
- struct_arrs = [
- # the selected series has name '1', so we rename it to
spark_field.name
- # as the name is used by _create_array to provide a meaningful
error message
- self._create_array(
- df[df.columns[i]].rename(spark_field.name),
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for i, spark_field in enumerate(return_type)
- ]
-
- return pa.StructArray.from_arrays(struct_arrs,
fields=list(arrow_struct_type))
-
- def _create_batch(
- self, series, *, arrow_cast=False, prefers_large_types=False,
struct_in_pandas="dict"
- ):
+ def dump_stream(self, iterator, stream):
"""
- Create an Arrow record batch from the given pandas.Series,
pandas.DataFrame,
- or list of Series/DataFrame, with optional Spark type.
-
- Parameters
- ----------
- series : pandas.Series or pandas.DataFrame or list
- A single series or dataframe, list of series or dataframe,
- or list of (series or dataframe, spark_type) tuples.
- arrow_cast : bool, optional
- If True, use Arrow's cast method for type conversion.
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
- struct_in_pandas : str, optional
- How to represent struct types in pandas: "dict" or "row".
- Default is "dict".
-
- Returns
- -------
- pyarrow.RecordBatch
- Arrow RecordBatch
+ Override because Pandas UDFs require a START_ARROW_STREAM before the
Arrow stream is sent.
+ This should be sent after creating the first record batch so in case
of an error, it can
+ be sent back to the JVM before the Arrow stream starts.
"""
import pandas as pd
import pyarrow as pa
- # Normalize input to list of (data, spark_type) tuples
- # Handle: single series, (series, type) tuple, or list of tuples
- if not isinstance(series, (list, tuple)) or (
- len(series) == 2 and isinstance(series[1], DataType)
- ):
- series = [series]
- # Ensure each element is a (data, spark_type) tuple
- series = [(s, None) if not isinstance(s, (list, tuple)) else s for s
in series]
-
- arrs = []
- for s, spark_type in series:
- # Convert spark_type to arrow_type for type checking (similar to
master branch)
- arrow_type = (
- to_arrow_type(
- spark_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
+ def create_batch(series_with_types):
+ """Create batch from list of (data, spark_type) tuples."""
+ arrs = []
+ for s, spark_type in series_with_types:
Review Comment:
is spark_type the return type?
shall we all ways use `ret_type` for return type?
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -631,170 +517,78 @@ def __init__(
arrow_cast,
)
self._assign_cols_by_name = assign_cols_by_name
+ self._ignore_unexpected_complex_type_values =
ignore_unexpected_complex_type_values
+ self._error_class = error_class
- def _create_struct_array(
- self,
- df: "pd.DataFrame",
- return_type: StructType,
- *,
- prefers_large_types: bool = False,
- ):
- """
- Create an Arrow StructArray from the given pandas.DataFrame and Spark
StructType.
-
- Parameters
- ----------
- df : pandas.DataFrame
- A pandas DataFrame
- return_type : StructType
- The Spark return type (StructType) to use
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
-
- Returns
- -------
- pyarrow.Array
- """
- import pyarrow as pa
-
- # Derive arrow_struct_type from return_type
- arrow_struct_type = to_arrow_type(
- return_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
- )
-
- if len(df.columns) == 0:
- return pa.array([{}] * len(df), arrow_struct_type)
- # Assign result columns by schema name if user labeled with strings
- if self._assign_cols_by_name and any(isinstance(name, str) for name in
df.columns):
- struct_arrs = [
- self._create_array(
- df[spark_field.name],
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for spark_field in return_type
- ]
- # Assign result columns by position
- else:
- struct_arrs = [
- # the selected series has name '1', so we rename it to
spark_field.name
- # as the name is used by _create_array to provide a meaningful
error message
- self._create_array(
- df[df.columns[i]].rename(spark_field.name),
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for i, spark_field in enumerate(return_type)
- ]
-
- return pa.StructArray.from_arrays(struct_arrs,
fields=list(arrow_struct_type))
-
- def _create_batch(
- self, series, *, arrow_cast=False, prefers_large_types=False,
struct_in_pandas="dict"
- ):
+ def dump_stream(self, iterator, stream):
"""
- Create an Arrow record batch from the given pandas.Series,
pandas.DataFrame,
- or list of Series/DataFrame, with optional Spark type.
-
- Parameters
- ----------
- series : pandas.Series or pandas.DataFrame or list
- A single series or dataframe, list of series or dataframe,
- or list of (series or dataframe, spark_type) tuples.
- arrow_cast : bool, optional
- If True, use Arrow's cast method for type conversion.
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
- struct_in_pandas : str, optional
- How to represent struct types in pandas: "dict" or "row".
- Default is "dict".
-
- Returns
- -------
- pyarrow.RecordBatch
- Arrow RecordBatch
+ Override because Pandas UDFs require a START_ARROW_STREAM before the
Arrow stream is sent.
+ This should be sent after creating the first record batch so in case
of an error, it can
+ be sent back to the JVM before the Arrow stream starts.
"""
import pandas as pd
import pyarrow as pa
- # Normalize input to list of (data, spark_type) tuples
- # Handle: single series, (series, type) tuple, or list of tuples
- if not isinstance(series, (list, tuple)) or (
- len(series) == 2 and isinstance(series[1], DataType)
- ):
- series = [series]
- # Ensure each element is a (data, spark_type) tuple
- series = [(s, None) if not isinstance(s, (list, tuple)) else s for s
in series]
-
- arrs = []
- for s, spark_type in series:
- # Convert spark_type to arrow_type for type checking (similar to
master branch)
- arrow_type = (
- to_arrow_type(
- spark_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
+ def create_batch(series_with_types):
+ """Create batch from list of (data, spark_type) tuples."""
+ arrs = []
+ for s, spark_type in series_with_types:
+ arrow_type = (
+ to_arrow_type(
+ spark_type,
+ timezone=self._timezone,
+ prefers_large_types=self._prefers_large_types,
+ )
+ if spark_type is not None
+ else None
)
- if spark_type is not None
- else None
- )
- # Variants are represented in arrow as structs with additional
metadata (checked by
- # is_variant). If the data type is Variant, return a VariantVal
atomic type instead of
- # a dict of two binary values.
- if (
- struct_in_pandas == "dict"
- and arrow_type is not None
- and pa.types.is_struct(arrow_type)
- and not is_variant(arrow_type)
- ):
- # A pandas UDF should return pd.DataFrame when the return type
is a struct type.
- # If it returns a pd.Series, it should throw an error.
- if not isinstance(s, pd.DataFrame):
+ # Struct type validation: must return DataFrame for struct
types
+ is_struct_type = (
Review Comment:
I think I mentioned this in previous PRs, such check based on
`pa.types.is_struct(arrow_type)` is not correct since we are adding more and
more complex types based on arrow struct, for example, the geo types.
We should always check `is_struct_type` by spark type.
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -631,170 +517,78 @@ def __init__(
arrow_cast,
)
self._assign_cols_by_name = assign_cols_by_name
+ self._ignore_unexpected_complex_type_values =
ignore_unexpected_complex_type_values
+ self._error_class = error_class
- def _create_struct_array(
- self,
- df: "pd.DataFrame",
- return_type: StructType,
- *,
- prefers_large_types: bool = False,
- ):
- """
- Create an Arrow StructArray from the given pandas.DataFrame and Spark
StructType.
-
- Parameters
- ----------
- df : pandas.DataFrame
- A pandas DataFrame
- return_type : StructType
- The Spark return type (StructType) to use
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
-
- Returns
- -------
- pyarrow.Array
- """
- import pyarrow as pa
-
- # Derive arrow_struct_type from return_type
- arrow_struct_type = to_arrow_type(
- return_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
- )
-
- if len(df.columns) == 0:
- return pa.array([{}] * len(df), arrow_struct_type)
- # Assign result columns by schema name if user labeled with strings
- if self._assign_cols_by_name and any(isinstance(name, str) for name in
df.columns):
- struct_arrs = [
- self._create_array(
- df[spark_field.name],
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for spark_field in return_type
- ]
- # Assign result columns by position
- else:
- struct_arrs = [
- # the selected series has name '1', so we rename it to
spark_field.name
- # as the name is used by _create_array to provide a meaningful
error message
- self._create_array(
- df[df.columns[i]].rename(spark_field.name),
- spark_field.dataType,
- arrow_cast=self._arrow_cast,
- prefers_large_types=prefers_large_types,
- )
- for i, spark_field in enumerate(return_type)
- ]
-
- return pa.StructArray.from_arrays(struct_arrs,
fields=list(arrow_struct_type))
-
- def _create_batch(
- self, series, *, arrow_cast=False, prefers_large_types=False,
struct_in_pandas="dict"
- ):
+ def dump_stream(self, iterator, stream):
"""
- Create an Arrow record batch from the given pandas.Series,
pandas.DataFrame,
- or list of Series/DataFrame, with optional Spark type.
-
- Parameters
- ----------
- series : pandas.Series or pandas.DataFrame or list
- A single series or dataframe, list of series or dataframe,
- or list of (series or dataframe, spark_type) tuples.
- arrow_cast : bool, optional
- If True, use Arrow's cast method for type conversion.
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
- struct_in_pandas : str, optional
- How to represent struct types in pandas: "dict" or "row".
- Default is "dict".
-
- Returns
- -------
- pyarrow.RecordBatch
- Arrow RecordBatch
+ Override because Pandas UDFs require a START_ARROW_STREAM before the
Arrow stream is sent.
+ This should be sent after creating the first record batch so in case
of an error, it can
+ be sent back to the JVM before the Arrow stream starts.
"""
import pandas as pd
import pyarrow as pa
- # Normalize input to list of (data, spark_type) tuples
- # Handle: single series, (series, type) tuple, or list of tuples
- if not isinstance(series, (list, tuple)) or (
- len(series) == 2 and isinstance(series[1], DataType)
- ):
- series = [series]
- # Ensure each element is a (data, spark_type) tuple
- series = [(s, None) if not isinstance(s, (list, tuple)) else s for s
in series]
-
- arrs = []
- for s, spark_type in series:
- # Convert spark_type to arrow_type for type checking (similar to
master branch)
- arrow_type = (
- to_arrow_type(
- spark_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
+ def create_batch(series_with_types):
+ """Create batch from list of (data, spark_type) tuples."""
+ arrs = []
+ for s, spark_type in series_with_types:
+ arrow_type = (
+ to_arrow_type(
+ spark_type,
+ timezone=self._timezone,
+ prefers_large_types=self._prefers_large_types,
+ )
+ if spark_type is not None
Review Comment:
in what case will the spark_type be none?
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -618,6 +502,8 @@ def __init__(
input_type: Optional[StructType] = None,
int_to_decimal_coercion_enabled: bool = False,
prefers_large_types: bool = False,
+ ignore_unexpected_complex_type_values: bool = False,
+ error_class: Optional[str] = None,
Review Comment:
where is the error_class from?
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -453,121 +432,26 @@ def arrow_to_pandas(
ndarray_as_list=ndarray_as_list,
)
- def _create_array(self, series, spark_type, *, arrow_cast=False,
prefers_large_types=False):
+ def dump_stream(self, iterator, stream):
"""
- Create an Arrow Array from the given pandas.Series and Spark type.
-
- Parameters
- ----------
- series : pandas.Series
- A single series
- spark_type : DataType, optional
- The Spark return type. For UDF return types, this should always be
provided
- and should never be None. If None, pyarrow's inferred type will be
used
- (for backward compatibility).
- arrow_cast : bool, optional
- Whether to apply Arrow casting when the user-specified return type
mismatches the
- actual return values.
- prefers_large_types : bool, optional
- Whether to prefer large Arrow types (e.g., large_string instead of
string).
-
- Returns
- -------
- pyarrow.Array
+ Make ArrowRecordBatches from Pandas Series and serialize.
+ Each element in iterator is an iterable of (series, spark_type) tuples.
"""
- import pyarrow as pa
- import pandas as pd
-
- if isinstance(series.dtype, pd.CategoricalDtype):
- series = series.astype(series.dtypes.categories.dtype)
-
- # Derive arrow_type from spark_type
- arrow_type = (
- to_arrow_type(
- spark_type, timezone=self._timezone,
prefers_large_types=prefers_large_types
- )
- if spark_type is not None
- else None
- )
+ from pyspark.sql.types import StructType, StructField
- if spark_type is not None:
- conv = _create_converter_from_pandas(
- spark_type,
+ def create_batch(series_tuples):
Review Comment:
type hint for new functions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]