HyukjinKwon opened a new pull request #29569:
URL: https://github.com/apache/spark/pull/29569


   ### What changes were proposed in this pull request?
   
   This PR updates the chart generated at SPARK-25666. We bumped up the minimal 
PyArrow version. It's better to use PyArrow 0.15.1+
   
   ### Why are the changes needed?
   
   To track the changes in type coercion of PySpark <> PyArrow <> pandas.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Use this code to generate the chart:
   
   ```python
   from pyspark.sql.types import *
   from pyspark.sql.functions import pandas_udf
   
   columns = [
       ('none', 'object(NoneType)'),
       ('bool', 'bool'),
       ('int8', 'int8'),
       ('int16', 'int16'),
       ('int32', 'int32'),
       ('int64', 'int64'),
       ('uint8', 'uint8'),
       ('uint16', 'uint16'),
       ('uint32', 'uint32'),
       ('uint64', 'uint64'),
       ('float64', 'float16'),
       ('float64', 'float32'),
       ('float64', 'float64'),
       ('date', 'datetime64[ns]'),
       ('tz_aware_dates', 'datetime64[ns, US/Eastern]'),
       ('string', 'object(string)'),
       ('decimal', 'object(Decimal)'),
       ('array', 'object(array[int32])'),
       ('float128', 'float128'),
       ('complex64', 'complex64'),
       ('complex128', 'complex128'),
       ('category', 'category'),
       ('tdeltas', 'timedelta64[ns]'),
   ]
   
   def create_dataframe():
       import pandas as pd
       import numpy as np
       import decimal
       pdf = pd.DataFrame({
           'none': [None, None],
           'bool': [True, False],
           'int8': np.arange(1, 3).astype('int8'),
           'int16': np.arange(1, 3).astype('int16'),
           'int32': np.arange(1, 3).astype('int32'),
           'int64': np.arange(1, 3).astype('int64'),
           'uint8': np.arange(1, 3).astype('uint8'),
           'uint16': np.arange(1, 3).astype('uint16'),
           'uint32': np.arange(1, 3).astype('uint32'),
           'uint64': np.arange(1, 3).astype('uint64'),
           'float16': np.arange(1, 3).astype('float16'),
           'float32': np.arange(1, 3).astype('float32'),
           'float64': np.arange(1, 3).astype('float64'),
           'float128': np.arange(1, 3).astype('float128'),
           'complex64': np.arange(1, 3).astype('complex64'),
           'complex128': np.arange(1, 3).astype('complex128'),
           'string': list('ab'),
           'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), 
np.array([1, 2, 3], dtype=np.int32)]),
           'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]),
           'date': pd.date_range('19700101', periods=2).values,
           'category': pd.Series(list("AB")).astype('category')})
       pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]]
       pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, 
tz='US/Eastern')
       return pdf
   
   types =  [
       BooleanType(),
       ByteType(),
       ShortType(),
       IntegerType(),
       LongType(),
       FloatType(),
       DoubleType(),
       DateType(),
       TimestampType(),
       StringType(),
       DecimalType(10, 0),
       ArrayType(IntegerType()),
       MapType(StringType(), IntegerType()),
       StructType([StructField("_1", IntegerType())]),
       BinaryType(),
   ]
   
   df = spark.range(2).repartition(1)
   results = []
   count = 0
   total = len(types) * len(columns)
   values = []
   spark.sparkContext.setLogLevel("FATAL")
   for t in types:
       result = []
       for column, pandas_t in columns:
           v = create_dataframe()[column][0]
           values.append(v)
           try:
               row = df.select(pandas_udf(lambda _: create_dataframe()[column], 
t)(df.id)).first()
               ret_str = repr(row[0])
           except Exception:
               ret_str = "X"
           result.append(ret_str)
           progress = "SQL Type: [%s]\n  Pandas Value(Type): %s(%s)]\n  Result 
Python Value: [%s]" % (
               t.simpleString(), v, pandas_t, ret_str)
           count += 1
           print("%s/%s:\n  %s" % (count, total, progress))
       results.append([t.simpleString()] + list(map(str, result)))
   
   
   schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: 
"%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns)))
   strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 
20, False)
   print("\n".join(map(lambda line: "    # %s  # noqa" % line, 
strings.strip().split("\n"))))
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to