gaogaotiantian commented on code in PR #53800:
URL: https://github.com/apache/spark/pull/53800#discussion_r2703284277


##########
python/pyspark/sql/conversion.py:
##########
@@ -845,3 +845,117 @@ def convert(
                 return [tuple()] * table.num_rows
             else:
                 return [_create_row(fields, tuple())] * table.num_rows
+
+
+class ArrowTimestampeConversion:

Review Comment:
   `ArrowTimestampConversion` instead of `ArrowTimestampeConversion`. You have 
an extra `e` :)



##########
python/pyspark/sql/conversion.py:
##########
@@ -845,3 +845,117 @@ def convert(
                 return [tuple()] * table.num_rows
             else:
                 return [_create_row(fields, tuple())] * table.num_rows
+
+
+class ArrowTimestampeConversion:
+    @staticmethod
+    def _need_localization(at: "pa.DataType") -> bool:
+        import pyarrow.types as types
+
+        if types.is_timestamp(at) and at.tz is not None:
+            return True
+        elif (
+            types.is_list(at)
+            or types.is_large_list(at)
+            or types.is_fixed_size_list(at)
+            or types.is_dictionary(at)
+        ):
+            return ArrowTimestampeConversion._need_localization(at.value_type)
+        elif types.is_map(at):
+            return any(
+                ArrowTimestampeConversion._need_localization(dt)
+                for dt in [at.key_type, at.item_type]
+            )
+        elif types.is_struct(at):
+            return 
any(ArrowTimestampeConversion._need_localization(field.type) for field in at)
+        else:
+            return False
+
+    @staticmethod
+    def localize_tz(a: "pa.Array") -> "pa.Array":
+        """
+        Convert Arrow timezone-aware timestamps to timezone-naive in the 
specified timezone.
+        This function works on Arrow Arrays, and it recurses to convert nested 
types.
+        This function is dedicated for Pandas UDF execution.
+
+        Differences from _create_converter_to_pandas + 
_check_series_convert_timestamps_local_tz:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not use local time at any time;
+        3, handle nested types in a consistent way. 
(_create_converter_to_pandas handles
+        simple timestamp series with session timezone, but handles nested 
series with
+        datetime.timezone.utc)
+
+        Differences from _check_arrow_array_timestamps_localize:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not handle timezone-naive timestamp;
+        3, do not support unit coercion which won't happen in UDF execution.
+
+        Parameters
+        ----------
+        a : :class:`pyarrow.Array`
+
+        Returns
+        -------
+        :class:`pyarrow.Array`
+
+        Notes
+        -----
+        Arrow UDF (@arrow_udf/mapInArrow/etc) always preserve the original 
timezone, and thus
+        doesn't need this conversion.
+        """
+        import pyarrow as pa
+        import pyarrow.types as types
+        import pyarrow.compute as pc
+
+        at = a.type
+
+        if not ArrowTimestampeConversion._need_localization(at):
+            return a
+
+        if types.is_timestamp(at) and at.tz is not None:
+            # import datetime

Review Comment:
   Are these comments left here intentionally?



##########
python/pyspark/sql/conversion.py:
##########
@@ -845,3 +845,117 @@ def convert(
                 return [tuple()] * table.num_rows
             else:
                 return [_create_row(fields, tuple())] * table.num_rows
+
+
+class ArrowTimestampeConversion:
+    @staticmethod

Review Comment:
   I know this is kind of how other code does it, but do we have a thorough 
consideration about `staticmethod` vs `classmethod`? One of the good thing 
about `classmethod` is that, instead of 
`ArrowTimestampeConversion._need_localization`, you can just do 
`cls._need_localization`, which is helpful when you need to change the class 
name (like now).



##########
python/pyspark/sql/conversion.py:
##########
@@ -845,3 +845,117 @@ def convert(
                 return [tuple()] * table.num_rows
             else:
                 return [_create_row(fields, tuple())] * table.num_rows
+
+
+class ArrowTimestampeConversion:
+    @staticmethod
+    def _need_localization(at: "pa.DataType") -> bool:
+        import pyarrow.types as types
+
+        if types.is_timestamp(at) and at.tz is not None:
+            return True
+        elif (
+            types.is_list(at)
+            or types.is_large_list(at)
+            or types.is_fixed_size_list(at)
+            or types.is_dictionary(at)
+        ):
+            return ArrowTimestampeConversion._need_localization(at.value_type)
+        elif types.is_map(at):
+            return any(
+                ArrowTimestampeConversion._need_localization(dt)
+                for dt in [at.key_type, at.item_type]
+            )
+        elif types.is_struct(at):
+            return 
any(ArrowTimestampeConversion._need_localization(field.type) for field in at)
+        else:
+            return False
+
+    @staticmethod
+    def localize_tz(a: "pa.Array") -> "pa.Array":
+        """
+        Convert Arrow timezone-aware timestamps to timezone-naive in the 
specified timezone.
+        This function works on Arrow Arrays, and it recurses to convert nested 
types.
+        This function is dedicated for Pandas UDF execution.
+
+        Differences from _create_converter_to_pandas + 
_check_series_convert_timestamps_local_tz:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not use local time at any time;
+        3, handle nested types in a consistent way. 
(_create_converter_to_pandas handles
+        simple timestamp series with session timezone, but handles nested 
series with
+        datetime.timezone.utc)
+
+        Differences from _check_arrow_array_timestamps_localize:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not handle timezone-naive timestamp;
+        3, do not support unit coercion which won't happen in UDF execution.
+
+        Parameters
+        ----------
+        a : :class:`pyarrow.Array`
+
+        Returns
+        -------
+        :class:`pyarrow.Array`
+
+        Notes
+        -----
+        Arrow UDF (@arrow_udf/mapInArrow/etc) always preserve the original 
timezone, and thus
+        doesn't need this conversion.
+        """
+        import pyarrow as pa
+        import pyarrow.types as types
+        import pyarrow.compute as pc
+
+        at = a.type
+
+        if not ArrowTimestampeConversion._need_localization(at):
+            return a
+
+        if types.is_timestamp(at) and at.tz is not None:
+            # import datetime
+            # from zoneinfo import ZoneInfo
+            # ts = datetime.datetime(2022, 1, 5, 15, 0, 1, 
tzinfo=ZoneInfo('Asia/Singapore'))
+            # arr = pa.array([ts])
+            # arr[0]
+            # <pyarrow.TimestampScalar: '2022-01-05T15:00:01.000000+0800'>
+            # arr = pc.local_timestamp(arr)
+            # arr[0]
+            # <pyarrow.TimestampScalar: '2022-01-05T15:00:01.000000'>
+
+            return pc.local_timestamp(a)
+        elif types.is_list(a.type):
+            return pa.ListArray.from_arrays(
+                offsets=a.offsets,
+                values=ArrowTimestampeConversion.localize_tz(a.values),
+            )
+        elif types.is_large_list(a.type):
+            return pa.LargeListType.from_arrays(
+                offsets=a.offsets,
+                values=ArrowTimestampeConversion.localize_tz(a.values),
+            )
+        elif types.is_fixed_size_list(at):
+            return pa.FixedSizeListArray.from_arrays(
+                values=ArrowTimestampeConversion.localize_tz(a.values),
+            )
+        elif types.is_dictionary(a.type):
+            return pa.DictionaryArray.from_arrays(
+                indices=a.indices,
+                dictionary=ArrowTimestampeConversion.localize_tz(a.dictionary),
+            )
+        elif types.is_map(at):
+            return pa.MapArray.from_arrays(
+                offsets=a.offsets,
+                keys=ArrowTimestampeConversion.localize_tz(a.keys),
+                items=ArrowTimestampeConversion.localize_tz(a.items),
+            )
+        elif types.is_struct(a.type):
+            return pa.StructArray.from_arrays(
+                arrays=[
+                    ArrowTimestampeConversion.localize_tz(a.field(i)) for i in 
range(len(a.type))
+                ],
+                names=a.type.names,
+            )
+

Review Comment:
   An extra blank line here.



##########
python/pyspark/sql/conversion.py:
##########
@@ -845,3 +845,117 @@ def convert(
                 return [tuple()] * table.num_rows
             else:
                 return [_create_row(fields, tuple())] * table.num_rows
+
+
+class ArrowTimestampeConversion:
+    @staticmethod
+    def _need_localization(at: "pa.DataType") -> bool:
+        import pyarrow.types as types
+
+        if types.is_timestamp(at) and at.tz is not None:
+            return True
+        elif (
+            types.is_list(at)
+            or types.is_large_list(at)
+            or types.is_fixed_size_list(at)
+            or types.is_dictionary(at)
+        ):
+            return ArrowTimestampeConversion._need_localization(at.value_type)
+        elif types.is_map(at):
+            return any(
+                ArrowTimestampeConversion._need_localization(dt)
+                for dt in [at.key_type, at.item_type]
+            )
+        elif types.is_struct(at):
+            return 
any(ArrowTimestampeConversion._need_localization(field.type) for field in at)
+        else:
+            return False
+
+    @staticmethod
+    def localize_tz(a: "pa.Array") -> "pa.Array":
+        """
+        Convert Arrow timezone-aware timestamps to timezone-naive in the 
specified timezone.
+        This function works on Arrow Arrays, and it recurses to convert nested 
types.
+        This function is dedicated for Pandas UDF execution.
+
+        Differences from _create_converter_to_pandas + 
_check_series_convert_timestamps_local_tz:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not use local time at any time;
+        3, handle nested types in a consistent way. 
(_create_converter_to_pandas handles
+        simple timestamp series with session timezone, but handles nested 
series with
+        datetime.timezone.utc)
+
+        Differences from _check_arrow_array_timestamps_localize:
+        1, respect the timezone field in pyarrow timestamp type;
+        2, do not handle timezone-naive timestamp;
+        3, do not support unit coercion which won't happen in UDF execution.
+
+        Parameters
+        ----------
+        a : :class:`pyarrow.Array`
+
+        Returns
+        -------
+        :class:`pyarrow.Array`
+
+        Notes
+        -----
+        Arrow UDF (@arrow_udf/mapInArrow/etc) always preserve the original 
timezone, and thus
+        doesn't need this conversion.
+        """
+        import pyarrow as pa
+        import pyarrow.types as types
+        import pyarrow.compute as pc
+
+        at = a.type

Review Comment:
   Hmm, I have doubts about variable name `a` and `at` - it would be much 
better if we name them `arr` and `arr_type`. `a` is just too ... short. It's 
not obvious to people (me?) that `at` is "type of a".



##########
python/pyspark/sql/tests/test_conversion.py:
##########
@@ -15,11 +15,14 @@
 # limitations under the License.
 #
 import unittest
+import datetime

Review Comment:
   nit: we don't have isort now for package import orders, but to minimize the 
future hazzle, maybe move `datetime` above `unittest` for alphabetic order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to