Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19607#discussion_r149002542
--- Diff: python/pyspark/sql/types.py ---
@@ -1629,35 +1629,112 @@ def to_arrow_type(dt):
return arrow_type
-def _check_dataframe_localize_timestamps(pdf):
+def from_arrow_type(at):
+ """ Convert pyarrow type to Spark data type.
+ """
+ # TODO: newer pyarrow has is_boolean(at) functions that would be
better to check type
+ import pyarrow as pa
+ if at == pa.bool_():
+ spark_type = BooleanType()
+ elif at == pa.int8():
+ spark_type = ByteType()
+ elif at == pa.int16():
+ spark_type = ShortType()
+ elif at == pa.int32():
+ spark_type = IntegerType()
+ elif at == pa.int64():
+ spark_type = LongType()
+ elif at == pa.float32():
+ spark_type = FloatType()
+ elif at == pa.float64():
+ spark_type = DoubleType()
+ elif type(at) == pa.DecimalType:
+ spark_type = DecimalType(precision=at.precision, scale=at.scale)
+ elif at == pa.string():
+ spark_type = StringType()
+ elif at == pa.date32():
+ spark_type = DateType()
+ elif type(at) == pa.TimestampType:
+ spark_type = TimestampType()
+ else:
+ raise TypeError("Unsupported type in conversion from Arrow: " +
str(at))
+ return spark_type
+
+
+def from_arrow_schema(arrow_schema):
+ """ Convert schema from Arrow to Spark.
+ """
+ return StructType(
+ [StructField(field.name, from_arrow_type(field.type),
nullable=field.nullable)
+ for field in arrow_schema])
+
+
+def _check_dataframe_localize_timestamps(pdf, schema, timezone):
"""
Convert timezone aware timestamps to timezone-naive in local time
:param pdf: pandas.DataFrame
:return pandas.DataFrame where any timezone aware columns have be
converted to tz-naive
"""
- from pandas.api.types import is_datetime64tz_dtype
- for column, series in pdf.iteritems():
- # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
- if is_datetime64tz_dtype(series.dtype):
- pdf[column] =
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+ import pandas as pd
+ try:
+ from pandas.api.types import is_datetime64tz_dtype,
is_datetime64_dtype
+ tz = timezone or 'tzlocal()'
+ for column, series in pdf.iteritems():
+ if type(schema[str(column)].dataType) == TimestampType:
+ # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
+ if is_datetime64tz_dtype(series.dtype):
+ pdf[column] =
series.dt.tz_convert(tz).dt.tz_localize(None)
+ elif is_datetime64_dtype(series.dtype) and timezone is not
None:
+ # `series.dt.tz_localize('tzlocal()')` doesn't work
properly when including NaT.
+ pdf[column] = series.apply(
+ lambda ts:
ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None)
+ if ts is not pd.NaT else pd.NaT)
+ except ImportError:
+ from pandas.core.common import is_datetime64_dtype
+ from pandas.tslib import _dateutil_tzlocal
+ tzlocal = _dateutil_tzlocal()
+ tz = timezone or tzlocal
+ for column, series in pdf.iteritems():
+ if type(schema[str(column)].dataType) == TimestampType:
+ # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
+ if not is_datetime64_dtype(series.dtype):
--- End diff --
Can't we just use `is_datetime64tz_dtype` as above?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]