zhengruifeng commented on code in PR #54105:
URL: https://github.com/apache/spark/pull/54105#discussion_r2757406882
##########
python/pyspark/sql/conversion.py:
##########
@@ -1112,47 +1179,154 @@ def convert_legacy(
return converter(ser)
@classmethod
- def convert(
+ def _prefer_convert_numpy(
cls,
- arrow_column: Union["pa.Array", "pa.ChunkedArray"],
- target_type: DataType,
+ spark_type: DataType,
+ df_for_struct: bool,
+ ) -> bool:
+ supported_types = (
+ NullType,
+ BinaryType,
+ BooleanType,
+ FloatType,
+ DoubleType,
+ ByteType,
+ ShortType,
+ IntegerType,
+ LongType,
+ )
+ if df_for_struct and isinstance(spark_type, StructType):
+ return all(isinstance(f.dataType, supported_types) for f in
spark_type.fields)
+ else:
+ return isinstance(spark_type, supported_types)
+
+ @classmethod
+ def convert_numpy(
+ cls,
+ arr: Union["pa.Array", "pa.ChunkedArray"],
+ spark_type: DataType,
*,
timezone: Optional[str] = None,
- struct_in_pandas: str = "dict",
+ struct_in_pandas: Optional[str] = None,
ndarray_as_list: bool = False,
df_for_struct: bool = False,
) -> Union["pd.Series", "pd.DataFrame"]:
- """
- Convert a PyArrow Array or ChunkedArray to a pandas Series or
DataFrame.
+ import pyarrow as pa
+ import pandas as pd
- Parameters
- ----------
- arrow_column : pa.Array or pa.ChunkedArray
- The Arrow column to convert.
- target_type : DataType
- The target Spark type for the column to be converted to.
- timezone : str, optional
- Timezone for timestamp conversion. Required if the data contains
timestamp types.
- struct_in_pandas : str, optional
- How to represent struct types in pandas. Valid values are "dict",
"row", or "legacy".
- Default is "dict".
- ndarray_as_list : bool, optional
- Whether to convert numpy ndarrays to Python lists. Default is
False.
- df_for_struct : bool, optional
- If True, convert struct columns to a DataFrame with columns
corresponding
- to struct fields instead of a Series. Default is False.
+ assert isinstance(arr, (pa.Array, pa.ChunkedArray))
- Returns
- -------
- pd.Series or pd.DataFrame
- Converted pandas Series. If df_for_struct is True and the type is
StructType,
- returns a DataFrame with columns corresponding to struct fields.
- """
- return cls.convert_legacy(
- arrow_column,
- target_type,
- timezone=timezone,
- struct_in_pandas=struct_in_pandas,
- ndarray_as_list=ndarray_as_list,
- df_for_struct=df_for_struct,
- )
+ if df_for_struct and isinstance(spark_type, StructType):
+ import pyarrow.types as types
+
+ assert types.is_struct(arr.type)
+ assert len(spark_type.names) == len(arr.type.names),
f"{spark_type} {arr.type} "
+
+ series = [
+ cls.convert_numpy(
+ field_arr,
+ spark_type=field.dataType,
+ timezone=timezone,
+ struct_in_pandas=struct_in_pandas,
+ ndarray_as_list=ndarray_as_list,
+ df_for_struct=False, # always False for child fields
+ )
+ for field_arr, field in zip(arr.flatten(), spark_type)
+ ]
+ pdf = pd.concat(series, axis=1)
+ pdf.columns = spark_type.names # type: ignore[assignment]
+ return pdf
+
+ arr = ArrowTimestampConversion.localize_tz(arr)
+
+ # TODO(fangchen): introduce benchmark for such conversions
+ # 1, benchmark a nullable integral array
+ # a = pa.array(list(range(10000000)) + [9223372036854775707, None],
type=pa.int64())
+ # %timeit a.to_pandas(types_mapper=pd.ArrowDtype)
+ # 11.9 μs ± 407 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops
each)
+ # %timeit
a.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+ # 589 ms ± 9.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+ # %timeit pd.Series(a.to_pylist(), dtype=pd.Int64Dtype())
+ # 2.94 s ± 19.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+ # %timeit
a.to_pandas(integer_object_nulls=True).astype(pd.Int64Dtype())
+ # 2.05 s ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+ # pd.Series(a, dtype=pd.Int64Dtype())
+ # fails due to internal np.float64 coercion
+ # OverflowError: Python int too large to convert to C long
+ #
+ # 2, benchmark a nullable integral array
+ # b = pa.array(list(range(10000000)) + [9223372036854775707, 1],
type=pa.int64())
+ # %timeit b.to_pandas(types_mapper=pd.ArrowDtype).astype(np.int64)
+ # 30.2 μs ± 831 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops
each)
+ # %timeit pd.Series(b.to_pandas(types_mapper=pd.ArrowDtype),
dtype=np.int64)
+ # 33.3 μs ± 928 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops
each)
+ # %timeit pd.Series(b, dtype=np.int64) <- lose the name
+ # 11.9 μs ± 125 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops
each)
+ # %timeit b.to_pandas()
+ # 7.56 μs ± 96.5 ns per loop (mean ± std. dev. of 7 runs, 100,000
loops each)
+ # %timeit b.to_pandas().astype(np.int64) <- astype is non-trivial
+ # 19.1 μs ± 242 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops
each)
+ if isinstance(spark_type, ByteType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int8Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(spark_type, ShortType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int16Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(spark_type, IntegerType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int32Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(spark_type, LongType):
+ if arr.null_count > 0:
+ return
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+ else:
+ return arr.to_pandas()
+ elif isinstance(
+ spark_type,
+ (
+ NullType,
+ BinaryType,
+ BooleanType,
+ FloatType,
+ DoubleType,
+ DecimalType,
+ StringType,
+ DateType,
+ TimeType,
+ TimestampType,
+ TimestampNTZType,
+ DayTimeIntervalType,
+ YearMonthIntervalType,
+ ),
+ ):
+ # TODO(ruifeng): revisit date_as_object
+ # TODO(ruifeng): implement coerce_temporal_nanoseconds
Review Comment:
sure!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]