GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/21928

    [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to 
Arrow 0.9.0)

    ## What changes were proposed in this pull request?
    
    See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems 
using `from_pandas` to convert decimals fails if encounters a value of `None`:
    
    **Arrow 0.8.0**
    
    ```python
    import pyarrow as pa
    import pandas as pd
    from decimal import Decimal
    
    pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), 
type=pa.decimal128(3, 2))
    ```
    
    ```
    <pyarrow.lib.Decimal128Array object at 0x10a572c58>
    [
      Decimal('3.14'),
      NA
    ]
    ```
    
    **Arrow 0.9.0**
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
      File "array.pxi", line 177, in pyarrow.lib.array
      File "error.pxi", line 77, in pyarrow.lib.check_status
      File "error.pxi", line 77, in pyarrow.lib.check_status
    pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: 
Got Python object of type NoneType but can only handle these types: 
decimal.Decimal
    ```
    
    This PR propose to work around this via Decimal NaN:
    
    ```python
    pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), 
type=pa.decimal128(3, 2))
    ```
    
    ```
    <pyarrow.lib.Decimal128Array object at 0x10ffd2e68>
    [
      Decimal('3.14'),
      NA
    ]
    ```
    
    ## How was this patch tested?
    
    Manually tested:
    
    ```bash
    SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests
    ```
    
    **Before**
    
    ```
    Traceback (most recent call last):
      File "/.../spark/python/pyspark/sql/tests.py", line 4672, in 
test_vectorized_udf_null_decimal
        self.assertEquals(df.collect(), res.collect())
      File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect
        sock_info = self._jdf.collectToPython()
      File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
line 1257, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
328, in get_return_value
        format(target_id, ".", name), value)
    Py4JJavaError: An error occurred while calling o51.collectToPython.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 
in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 
(TID 7, localhost, executor driver): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/.../spark/python/pyspark/worker.py", line 320, in main
        process()
      File "/.../spark/python/pyspark/worker.py", line 315, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream
        batch = _create_batch(series, self._timezone)
      File "/.../spark/python/pyspark/serializers.py", line 243, in 
_create_batch
        arrs = [create_array(s, t) for s, t in series]
      File "/.../spark/python/pyspark/serializers.py", line 241, in create_array
        return pa.Array.from_pandas(s, mask=mask, type=t)
      File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
      File "array.pxi", line 177, in pyarrow.lib.array
      File "error.pxi", line 77, in pyarrow.lib.check_status
      File "error.pxi", line 77, in pyarrow.lib.check_status
    ArrowInvalid: Error converting from Python objects to Decimal: Got Python 
object of type NoneType but can only handle these types: decimal.Decimal
    ```
    
    **After**
    
    ```
    Running tests...
    ----------------------------------------------------------------------
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
    .......S.............................
    ----------------------------------------------------------------------
    Ran 37 tests in 21.980s
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-24976

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21928.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21928
    
----
commit 652afd0e6f156330d8b0dc28ee519605ae32e971
Author: hyukjinkwon <gurwls223@...>
Date:   2018-07-31T03:37:43Z

    Allow None for Decimal type conversion (specific to Arrow 0.9.0)

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to