Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19607#discussion_r148576678
  
    --- Diff: python/pyspark/sql/types.py ---
    @@ -1629,35 +1629,121 @@ def to_arrow_type(dt):
         return arrow_type
     
     
    -def _check_dataframe_localize_timestamps(pdf):
    +def to_arrow_schema(schema):
    +    """ Convert a schema from Spark to Arrow
    +    """
    +    import pyarrow as pa
    +    fields = [pa.field(field.name, to_arrow_type(field.dataType), 
nullable=field.nullable)
    +              for field in schema]
    +    return pa.schema(fields)
    +
    +
    +def from_arrow_type(at):
    +    """ Convert pyarrow type to Spark data type.
    +    """
    +    # TODO: newer pyarrow has is_boolean(at) functions that would be 
better to check type
    +    import pyarrow as pa
    +    if at == pa.bool_():
    +        spark_type = BooleanType()
    +    elif at == pa.int8():
    +        spark_type = ByteType()
    +    elif at == pa.int16():
    +        spark_type = ShortType()
    +    elif at == pa.int32():
    +        spark_type = IntegerType()
    +    elif at == pa.int64():
    +        spark_type = LongType()
    +    elif at == pa.float32():
    +        spark_type = FloatType()
    +    elif at == pa.float64():
    +        spark_type = DoubleType()
    +    elif type(at) == pa.DecimalType:
    +        spark_type = DecimalType(precision=at.precision, scale=at.scale)
    +    elif at == pa.string():
    +        spark_type = StringType()
    +    elif at == pa.date32():
    +        spark_type = DateType()
    +    elif type(at) == pa.TimestampType:
    +        spark_type = TimestampType()
    +    else:
    +        raise TypeError("Unsupported type in conversion from Arrow: " + 
str(at))
    +    return spark_type
    +
    +
    +def from_arrow_schema(arrow_schema):
    +    """ Convert schema from Arrow to Spark.
    +    """
    +    return StructType(
    +        [StructField(field.name, from_arrow_type(field.type), 
nullable=field.nullable)
    +         for field in arrow_schema])
    +
    +
    +def _check_dataframe_localize_timestamps(pdf, schema, timezone):
         """
         Convert timezone aware timestamps to timezone-naive in local time
     
         :param pdf: pandas.DataFrame
         :return pandas.DataFrame where any timezone aware columns have be 
converted to tz-naive
         """
    -    from pandas.api.types import is_datetime64tz_dtype
    -    for column, series in pdf.iteritems():
    -        # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
    -        if is_datetime64tz_dtype(series.dtype):
    -            pdf[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
    +    import pandas as pd
    +    try:
    +        from pandas.api.types import is_datetime64tz_dtype, 
is_datetime64_dtype
    +        tz = timezone or 'tzlocal()'
    +        for column, series in pdf.iteritems():
    +            if type(schema[str(column)].dataType) == TimestampType:
    +                # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
    +                if is_datetime64tz_dtype(series.dtype):
    +                    pdf[column] = 
series.dt.tz_convert(tz).dt.tz_localize(None)
    +                elif is_datetime64_dtype(series.dtype) and timezone is not 
None:
    +                    # `series.dt.tz_localize('tzlocal()')` doesn't work 
properly when including NaT.
    +                    pdf[column] = series.apply(
    +                        lambda ts: 
ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None)
    +                        if ts is not pd.NaT else pd.NaT)
    +    except ImportError:
    +        from pandas.core.common import is_datetime64_dtype
    +        from pandas.tslib import _dateutil_tzlocal
    +        tzlocal = _dateutil_tzlocal()
    +        tz = timezone or tzlocal
    +        for column, series in pdf.iteritems():
    +            if type(schema[str(column)].dataType) == TimestampType:
    +                # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
    +                if not is_datetime64_dtype(series.dtype):
    +                    # `series.dt.tz_convert(tzlocal).dt.tz_localize(None)` 
doesn't work properly.
    +                    pdf[column] = 
pd.Series([ts.tz_convert(tz).tz_localize(None)
    +                                             if ts is not pd.NaT else 
pd.NaT for ts in series])
    +                elif is_datetime64_dtype(series.dtype) and timezone is not 
None:
    +                    # `series.dt.tz_localize(tzlocal)` doesn't work 
properly.
    +                    pdf[column] = pd.Series(
    +                        
[ts.tz_localize(tzlocal).tz_convert(tz).tz_localize(None)
    +                         if ts is not pd.NaT else pd.NaT for ts in series])
         return pdf
     
     
    -def _check_series_convert_timestamps_internal(s):
    +def _check_series_convert_timestamps_internal(s, timezone):
         """
         Convert a tz-naive timestamp in local tz to UTC normalized for Spark 
internal storage
         :param s: a pandas.Series
         :return pandas.Series where if it is a timestamp, has been UTC 
normalized without a time zone
         """
    -    from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
    -    # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
    -    if is_datetime64_dtype(s.dtype):
    -        return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
    -    elif is_datetime64tz_dtype(s.dtype):
    -        return s.dt.tz_convert('UTC')
    -    else:
    -        return s
    +    try:
    +        from pandas.api.types import is_datetime64tz_dtype, 
is_datetime64_dtype
    +        # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
    +        if is_datetime64_dtype(s.dtype):
    +            tz = timezone or 'tzlocal()'
    +            return s.dt.tz_localize(tz).dt.tz_convert('UTC')
    +        elif is_datetime64tz_dtype(s.dtype):
    +            return s.dt.tz_convert('UTC')
    +        else:
    +            return s
    +    except ImportError:
    --- End diff --
    
    Sure, let me look into it a little more and summarize what version we can 
support.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to