ueshin commented on code in PR #41240:
URL: https://github.com/apache/spark/pull/41240#discussion_r1203029027
##########
python/pyspark/sql/pandas/conversion.py:
##########
@@ -375,22 +379,105 @@ def _convert_from_pandas(
assert isinstance(self, SparkSession)
if timezone is not None:
- from pyspark.sql.pandas.types import
_check_series_convert_timestamps_tz_local
+ from pyspark.sql.pandas.types import (
+ _check_series_convert_timestamps_tz_local,
+ _get_local_timezone,
+ )
from pandas.core.dtypes.common import is_datetime64tz_dtype,
is_timedelta64_dtype
copied = False
if isinstance(schema, StructType):
- for field in schema:
- # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
- if isinstance(field.dataType, TimestampType):
- s =
_check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
- if s is not pdf[field.name]:
- if not copied:
- # Copy once if the series is modified to
prevent the original
- # Pandas DataFrame from being updated
- pdf = pdf.copy()
- copied = True
- pdf[field.name] = s
+
+ def _create_converter(data_type: DataType) ->
Callable[[pd.Series], pd.Series]:
+ if isinstance(data_type, TimestampType):
+
+ def correct_timestamp(pser: pd.Series) -> pd.Series:
+ return
_check_series_convert_timestamps_tz_local(pser, timezone)
+
+ return correct_timestamp
+
+ def _converter(dt: DataType) -> Optional[Callable[[Any],
Any]]:
+ if isinstance(dt, ArrayType):
+ element_conv = _converter(dt.elementType) or
(lambda x: x)
+
+ def convert_array(value: Any) -> Any:
+ if value is None:
+ return None
+ else:
+ return [element_conv(v) for v in value]
+
+ return convert_array
+
+ elif isinstance(dt, MapType):
+ key_conv = _converter(dt.keyType) or (lambda x: x)
+ value_conv = _converter(dt.valueType) or (lambda
x: x)
+
+ def convert_map(value: Any) -> Any:
+ if value is None:
+ return None
+ else:
+ return {key_conv(k): value_conv(v) for k,
v in value.items()}
+
+ return convert_map
+
+ elif isinstance(dt, StructType):
+ field_names = dt.names
+ dedup_field_names = _dedup_names(field_names)
+ field_convs = [
+ _converter(f.dataType) or (lambda x: x) for f
in dt.fields
+ ]
+
+ def convert_struct(value: Any) -> Any:
+ if value is None:
+ return None
+ elif isinstance(value, dict):
+ _values = [
+ field_convs[i](value.get(name, None))
+ for i, name in
enumerate(dedup_field_names)
+ ]
+ return _create_row(field_names, _values)
+ else:
+ _values = [
+ field_convs[i](value[i]) for i, name
in enumerate(value)
+ ]
+ return _create_row(field_names, _values)
+
+ return convert_struct
+
+ elif isinstance(dt, TimestampType):
+
+ def convert_timestamp(value: Any) -> Any:
+ if value is None:
+ return None
+ else:
+ return (
+ pd.Timestamp(value)
+ .tz_localize(timezone,
ambiguous=False) # type: ignore
Review Comment:
Let me leave the type-ignore comment without the error tag. I see a weird
error in CI I can't reproduce in my local:
```
starting mypy annotations test...
annotations failed mypy checks:
python/pyspark/sql/pandas/conversion.py:454: error: Unexpected keyword
argument "ambiguous" for "tz_localize" of "Timestamp"; did you mean
"ambigious"? [call-arg]
/usr/local/lib/python3.9/dist-packages/pandas/_libs/tslibs/timestamps.pyi:38:
note: "tz_localize" of "Timestamp" defined here
python/pyspark/sql/pandas/conversion.py:456: error: unused "type: ignore"
comment
Found 2 errors in 1 file (checked 512 source files)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]