Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18664#discussion_r144248880
  
    --- Diff: python/pyspark/sql/types.py ---
    @@ -1619,11 +1619,47 @@ def to_arrow_type(dt):
             arrow_type = pa.decimal(dt.precision, dt.scale)
         elif type(dt) == StringType:
             arrow_type = pa.string()
    +    elif type(dt) == DateType:
    +        arrow_type = pa.date32()
    +    elif type(dt) == TimestampType:
    +        arrow_type = pa.timestamp('us', tz='UTC')
         else:
             raise TypeError("Unsupported type in conversion to Arrow: " + 
str(dt))
         return arrow_type
     
     
    +def _check_localize_series_timestamps(s):
    +    from pandas.types.common import is_datetime64_dtype
    +    # TODO: handle nested timestamps?
    +    if is_datetime64_dtype(s.dtype):
    +        # TODO: pyarrow.Column.to_pandas keeps data in UTC but removes 
timezone
    --- End diff --
    
    I'd suspect it's a bug because we expect the both behave the same way and 
they shouldn't drop the timezone info.
    Anyway, how about creating pandas DataFrame first and split into Series.
    
    I mean as follows:
    
    - python/pyspark/serializers.py
    ```diff
             reader = pa.open_stream(stream)
             for batch in reader:
    -            table = pa.Table.from_batches([batch])
    -            yield [_check_localize_series_timestamps(c.to_pandas()) for c 
in table.itercolumns()]
    +            pdf = batch.to_pandas()
    +            yield [_check_localize_series_timestamps(c) for _, c in 
pdf.iteritems()]
    ```
    
    - python/pyspark/sql/types.py
    ```diff
     def _check_localize_series_timestamps(s):
    -    from pandas.types.common import is_datetime64_dtype
    +    from pandas.types.common import is_datetime64tz_dtype
         # TODO: handle nested timestamps?
    -    if is_datetime64_dtype(s.dtype):
    -        # TODO: pyarrow.Column.to_pandas keeps data in UTC but removes 
timezone
    -        return 
s.dt.tz_localize('UTC').dt.tz_convert('tzlocal()').dt.tz_localize(None)
    +    if is_datetime64tz_dtype(s.dtype):
    +        return s.dt.tz_convert('tzlocal()').dt.tz_localize(None)
         else:
             return s
    ```
    
    We need to check if this causes performance regression or not.
    
    Btw, when using this patch, we can use `date(1969, 1, 1)` and 
`datetime(1969, 1, 1, 1, 1, 1)` for the test you marked as TODO in 
`test_vectorized_udf_timestamps`. I'm not exactly sure the reason, though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to