zhengruifeng commented on code in PR #54105:
URL: https://github.com/apache/spark/pull/54105#discussion_r2757406882


##########
python/pyspark/sql/conversion.py:
##########
@@ -1112,47 +1179,154 @@ def convert_legacy(
         return converter(ser)
 
     @classmethod
-    def convert(
+    def _prefer_convert_numpy(
         cls,
-        arrow_column: Union["pa.Array", "pa.ChunkedArray"],
-        target_type: DataType,
+        spark_type: DataType,
+        df_for_struct: bool,
+    ) -> bool:
+        supported_types = (
+            NullType,
+            BinaryType,
+            BooleanType,
+            FloatType,
+            DoubleType,
+            ByteType,
+            ShortType,
+            IntegerType,
+            LongType,
+        )
+        if df_for_struct and isinstance(spark_type, StructType):
+            return all(isinstance(f.dataType, supported_types) for f in 
spark_type.fields)
+        else:
+            return isinstance(spark_type, supported_types)
+
+    @classmethod
+    def convert_numpy(
+        cls,
+        arr: Union["pa.Array", "pa.ChunkedArray"],
+        spark_type: DataType,
         *,
         timezone: Optional[str] = None,
-        struct_in_pandas: str = "dict",
+        struct_in_pandas: Optional[str] = None,
         ndarray_as_list: bool = False,
         df_for_struct: bool = False,
     ) -> Union["pd.Series", "pd.DataFrame"]:
-        """
-        Convert a PyArrow Array or ChunkedArray to a pandas Series or 
DataFrame.
+        import pyarrow as pa
+        import pandas as pd
 
-        Parameters
-        ----------
-        arrow_column : pa.Array or pa.ChunkedArray
-            The Arrow column to convert.
-        target_type : DataType
-            The target Spark type for the column to be converted to.
-        timezone : str, optional
-            Timezone for timestamp conversion. Required if the data contains 
timestamp types.
-        struct_in_pandas : str, optional
-            How to represent struct types in pandas. Valid values are "dict", 
"row", or "legacy".
-            Default is "dict".
-        ndarray_as_list : bool, optional
-            Whether to convert numpy ndarrays to Python lists. Default is 
False.
-        df_for_struct : bool, optional
-            If True, convert struct columns to a DataFrame with columns 
corresponding
-            to struct fields instead of a Series. Default is False.
+        assert isinstance(arr, (pa.Array, pa.ChunkedArray))
 
-        Returns
-        -------
-        pd.Series or pd.DataFrame
-            Converted pandas Series. If df_for_struct is True and the type is 
StructType,
-            returns a DataFrame with columns corresponding to struct fields.
-        """
-        return cls.convert_legacy(
-            arrow_column,
-            target_type,
-            timezone=timezone,
-            struct_in_pandas=struct_in_pandas,
-            ndarray_as_list=ndarray_as_list,
-            df_for_struct=df_for_struct,
-        )
+        if df_for_struct and isinstance(spark_type, StructType):
+            import pyarrow.types as types
+
+            assert types.is_struct(arr.type)
+            assert len(spark_type.names) == len(arr.type.names), 
f"{spark_type} {arr.type} "
+
+            series = [
+                cls.convert_numpy(
+                    field_arr,
+                    spark_type=field.dataType,
+                    timezone=timezone,
+                    struct_in_pandas=struct_in_pandas,
+                    ndarray_as_list=ndarray_as_list,
+                    df_for_struct=False,  # always False for child fields
+                )
+                for field_arr, field in zip(arr.flatten(), spark_type)
+            ]
+            pdf = pd.concat(series, axis=1)
+            pdf.columns = spark_type.names  # type: ignore[assignment]
+            return pdf
+
+        arr = ArrowTimestampConversion.localize_tz(arr)
+
+        # TODO(fangchen): introduce benchmark for such conversions
+        # 1, benchmark a nullable integral array
+        # a = pa.array(list(range(10000000)) + [9223372036854775707, None], 
type=pa.int64())
+        # %timeit a.to_pandas(types_mapper=pd.ArrowDtype)
+        # 11.9 μs ± 407 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops 
each)
+        # %timeit 
a.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+        # 589 ms ± 9.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+        # %timeit pd.Series(a.to_pylist(), dtype=pd.Int64Dtype())
+        # 2.94 s ± 19.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+        # %timeit 
a.to_pandas(integer_object_nulls=True).astype(pd.Int64Dtype())
+        # 2.05 s ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
+        # pd.Series(a, dtype=pd.Int64Dtype())
+        # fails due to internal np.float64 coercion
+        # OverflowError: Python int too large to convert to C long
+        #
+        # 2, benchmark a nullable integral array
+        # b = pa.array(list(range(10000000)) + [9223372036854775707, 1], 
type=pa.int64())
+        # %timeit b.to_pandas(types_mapper=pd.ArrowDtype).astype(np.int64)
+        # 30.2 μs ± 831 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops 
each)
+        # %timeit pd.Series(b.to_pandas(types_mapper=pd.ArrowDtype), 
dtype=np.int64)
+        # 33.3 μs ± 928 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops 
each)
+        # %timeit pd.Series(b, dtype=np.int64) <- lose the name
+        # 11.9 μs ± 125 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops 
each)
+        # %timeit b.to_pandas()
+        # 7.56 μs ± 96.5 ns per loop (mean ± std. dev. of 7 runs, 100,000 
loops each)
+        # %timeit b.to_pandas().astype(np.int64) <- astype is non-trivial
+        # 19.1 μs ± 242 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops 
each)
+        if isinstance(spark_type, ByteType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int8Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(spark_type, ShortType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int16Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(spark_type, IntegerType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int32Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(spark_type, LongType):
+            if arr.null_count > 0:
+                return 
arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
+            else:
+                return arr.to_pandas()
+        elif isinstance(
+            spark_type,
+            (
+                NullType,
+                BinaryType,
+                BooleanType,
+                FloatType,
+                DoubleType,
+                DecimalType,
+                StringType,
+                DateType,
+                TimeType,
+                TimestampType,
+                TimestampNTZType,
+                DayTimeIntervalType,
+                YearMonthIntervalType,
+            ),
+        ):
+            # TODO(ruifeng): revisit date_as_object
+            # TODO(ruifeng): implement coerce_temporal_nanoseconds

Review Comment:
   sure!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to