ueshin commented on code in PR #41240:
URL: https://github.com/apache/spark/pull/41240#discussion_r1203029027


##########
python/pyspark/sql/pandas/conversion.py:
##########
@@ -375,22 +379,105 @@ def _convert_from_pandas(
         assert isinstance(self, SparkSession)
 
         if timezone is not None:
-            from pyspark.sql.pandas.types import 
_check_series_convert_timestamps_tz_local
+            from pyspark.sql.pandas.types import (
+                _check_series_convert_timestamps_tz_local,
+                _get_local_timezone,
+            )
             from pandas.core.dtypes.common import is_datetime64tz_dtype, 
is_timedelta64_dtype
 
             copied = False
             if isinstance(schema, StructType):
-                for field in schema:
-                    # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
-                    if isinstance(field.dataType, TimestampType):
-                        s = 
_check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
-                        if s is not pdf[field.name]:
-                            if not copied:
-                                # Copy once if the series is modified to 
prevent the original
-                                # Pandas DataFrame from being updated
-                                pdf = pdf.copy()
-                                copied = True
-                            pdf[field.name] = s
+
+                def _create_converter(data_type: DataType) -> 
Callable[[pd.Series], pd.Series]:
+                    if isinstance(data_type, TimestampType):
+
+                        def correct_timestamp(pser: pd.Series) -> pd.Series:
+                            return 
_check_series_convert_timestamps_tz_local(pser, timezone)
+
+                        return correct_timestamp
+
+                    def _converter(dt: DataType) -> Optional[Callable[[Any], 
Any]]:
+                        if isinstance(dt, ArrayType):
+                            element_conv = _converter(dt.elementType) or 
(lambda x: x)
+
+                            def convert_array(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                else:
+                                    return [element_conv(v) for v in value]
+
+                            return convert_array
+
+                        elif isinstance(dt, MapType):
+                            key_conv = _converter(dt.keyType) or (lambda x: x)
+                            value_conv = _converter(dt.valueType) or (lambda 
x: x)
+
+                            def convert_map(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                else:
+                                    return {key_conv(k): value_conv(v) for k, 
v in value.items()}
+
+                            return convert_map
+
+                        elif isinstance(dt, StructType):
+                            field_names = dt.names
+                            dedup_field_names = _dedup_names(field_names)
+                            field_convs = [
+                                _converter(f.dataType) or (lambda x: x) for f 
in dt.fields
+                            ]
+
+                            def convert_struct(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                elif isinstance(value, dict):
+                                    _values = [
+                                        field_convs[i](value.get(name, None))
+                                        for i, name in 
enumerate(dedup_field_names)
+                                    ]
+                                    return _create_row(field_names, _values)
+                                else:
+                                    _values = [
+                                        field_convs[i](value[i]) for i, name 
in enumerate(value)
+                                    ]
+                                    return _create_row(field_names, _values)
+
+                            return convert_struct
+
+                        elif isinstance(dt, TimestampType):
+
+                            def convert_timestamp(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                else:
+                                    return (
+                                        pd.Timestamp(value)
+                                        .tz_localize(timezone, 
ambiguous=False)  # type: ignore

Review Comment:
   Let me leave it without the error tag. I see a weird error in CI I can't 
reproduce in my local:
   
   ```
   starting mypy annotations test...
   annotations failed mypy checks:
   python/pyspark/sql/pandas/conversion.py:454: error: Unexpected keyword 
argument "ambiguous" for "tz_localize" of "Timestamp"; did you mean 
"ambigious"?  [call-arg]
   
/usr/local/lib/python3.9/dist-packages/pandas/_libs/tslibs/timestamps.pyi:38: 
note: "tz_localize" of "Timestamp" defined here
   python/pyspark/sql/pandas/conversion.py:456: error: unused "type: ignore" 
comment
   Found 2 errors in 1 file (checked 512 source files)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to