Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19607#discussion_r148576678
--- Diff: python/pyspark/sql/types.py ---
@@ -1629,35 +1629,121 @@ def to_arrow_type(dt):
return arrow_type
-def _check_dataframe_localize_timestamps(pdf):
+def to_arrow_schema(schema):
+ """ Convert a schema from Spark to Arrow
+ """
+ import pyarrow as pa
+ fields = [pa.field(field.name, to_arrow_type(field.dataType),
nullable=field.nullable)
+ for field in schema]
+ return pa.schema(fields)
+
+
+def from_arrow_type(at):
+ """ Convert pyarrow type to Spark data type.
+ """
+ # TODO: newer pyarrow has is_boolean(at) functions that would be
better to check type
+ import pyarrow as pa
+ if at == pa.bool_():
+ spark_type = BooleanType()
+ elif at == pa.int8():
+ spark_type = ByteType()
+ elif at == pa.int16():
+ spark_type = ShortType()
+ elif at == pa.int32():
+ spark_type = IntegerType()
+ elif at == pa.int64():
+ spark_type = LongType()
+ elif at == pa.float32():
+ spark_type = FloatType()
+ elif at == pa.float64():
+ spark_type = DoubleType()
+ elif type(at) == pa.DecimalType:
+ spark_type = DecimalType(precision=at.precision, scale=at.scale)
+ elif at == pa.string():
+ spark_type = StringType()
+ elif at == pa.date32():
+ spark_type = DateType()
+ elif type(at) == pa.TimestampType:
+ spark_type = TimestampType()
+ else:
+ raise TypeError("Unsupported type in conversion from Arrow: " +
str(at))
+ return spark_type
+
+
+def from_arrow_schema(arrow_schema):
+ """ Convert schema from Arrow to Spark.
+ """
+ return StructType(
+ [StructField(field.name, from_arrow_type(field.type),
nullable=field.nullable)
+ for field in arrow_schema])
+
+
+def _check_dataframe_localize_timestamps(pdf, schema, timezone):
"""
Convert timezone aware timestamps to timezone-naive in local time
:param pdf: pandas.DataFrame
:return pandas.DataFrame where any timezone aware columns have be
converted to tz-naive
"""
- from pandas.api.types import is_datetime64tz_dtype
- for column, series in pdf.iteritems():
- # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
- if is_datetime64tz_dtype(series.dtype):
- pdf[column] =
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+ import pandas as pd
+ try:
+ from pandas.api.types import is_datetime64tz_dtype,
is_datetime64_dtype
+ tz = timezone or 'tzlocal()'
+ for column, series in pdf.iteritems():
+ if type(schema[str(column)].dataType) == TimestampType:
+ # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
+ if is_datetime64tz_dtype(series.dtype):
+ pdf[column] =
series.dt.tz_convert(tz).dt.tz_localize(None)
+ elif is_datetime64_dtype(series.dtype) and timezone is not
None:
+ # `series.dt.tz_localize('tzlocal()')` doesn't work
properly when including NaT.
+ pdf[column] = series.apply(
+ lambda ts:
ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None)
+ if ts is not pd.NaT else pd.NaT)
+ except ImportError:
+ from pandas.core.common import is_datetime64_dtype
+ from pandas.tslib import _dateutil_tzlocal
+ tzlocal = _dateutil_tzlocal()
+ tz = timezone or tzlocal
+ for column, series in pdf.iteritems():
+ if type(schema[str(column)].dataType) == TimestampType:
+ # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
+ if not is_datetime64_dtype(series.dtype):
+ # `series.dt.tz_convert(tzlocal).dt.tz_localize(None)`
doesn't work properly.
+ pdf[column] =
pd.Series([ts.tz_convert(tz).tz_localize(None)
+ if ts is not pd.NaT else
pd.NaT for ts in series])
+ elif is_datetime64_dtype(series.dtype) and timezone is not
None:
+ # `series.dt.tz_localize(tzlocal)` doesn't work
properly.
+ pdf[column] = pd.Series(
+
[ts.tz_localize(tzlocal).tz_convert(tz).tz_localize(None)
+ if ts is not pd.NaT else pd.NaT for ts in series])
return pdf
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
"""
Convert a tz-naive timestamp in local tz to UTC normalized for Spark
internal storage
:param s: a pandas.Series
:return pandas.Series where if it is a timestamp, has been UTC
normalized without a time zone
"""
- from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
- # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
- if is_datetime64_dtype(s.dtype):
- return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
- elif is_datetime64tz_dtype(s.dtype):
- return s.dt.tz_convert('UTC')
- else:
- return s
+ try:
+ from pandas.api.types import is_datetime64tz_dtype,
is_datetime64_dtype
+ # TODO: handle nested timestamps, such as
ArrayType(TimestampType())?
+ if is_datetime64_dtype(s.dtype):
+ tz = timezone or 'tzlocal()'
+ return s.dt.tz_localize(tz).dt.tz_convert('UTC')
+ elif is_datetime64tz_dtype(s.dtype):
+ return s.dt.tz_convert('UTC')
+ else:
+ return s
+ except ImportError:
--- End diff --
Sure, let me look into it a little more and summarize what version we can
support.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]