[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r223177785 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType. When there is mismatch between them, it is not guaranteed +that the conversion by SparkSQL during serialization is correct at all and users might get --- End diff -- Yeah, as actually we don't intentionally cast the returned data. How about: ``` When there is mismatch between them, Spark might do conversion on returned data. The conversion is not guaranteed to be correct and results should be checked for accuracy by users. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r223173637 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType. When there is mismatch between them, it is not guaranteed +that the conversion by SparkSQL during serialization is correct at all and users might get --- End diff -- > an attempt will be made to cast the data and results should be checked for accuracy." it sounds like the casting is intentional. I think the casting logic is not that clear as far as I can tell, comparing SQL casting logic. Can we leave this not guaranteed for now and document the casting logic here instead? Does Arrow have some kind of documentation for type conversion BTW? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r223070065 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType. When there is mismatch between them, it is not guaranteed +that the conversion by SparkSQL during serialization is correct at all and users might get --- End diff -- instead of saying "conversion is not guaranteed" which sounds like results might be arbitrary, could we say "..mismatch between them, an attempt will be made to cast the data and results should be checked for accuracy."? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222885910 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType. When there is mismatch between them, it is not guaranteed +that the conversion by SparkSQL during serialization is correct at all and users might get --- End diff -- maybe I am concerning too much .. but how about just say .. ``` ... defined returnType (see :meth:`types.to_arrow_type` and :meth:`types.from_arrow_type`). When there is mismatch between them, the conversion is not guaranteed. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222885267 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- Yes .. I support to just fix the doc first here only and make a PR separately later if needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222617651 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- hmm, I'm neutral on whether we should display this warning message, before we have an option to check the unsafe conversion by PyArrow. @HyukjinKwon if you are also supportive, I will remove this and leave this PR as documentation only. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222616380 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " + "dtype %s doesn't match the arrow type %s " + "of defined return type %s" % (arrow_type_of_result, result.dtype, + arrow_return_type, return_type), + file=sys.stderr) +except: +print("WARN: Can't infer arrow type of Pandas.Series's dtype: %s, which might not " + "match the arrow type %s of defined return type %s" % (result.dtype, + arrow_return_type, + return_type), --- End diff -- Sorry I may misunderstand, do you mean L113 and L114 should be aligned with L112? But after that, lint-python will complain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222501309 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- Yeah, it might be useful to see the warning if doing some local tests etc. My only concern is that users might be confused why they see a warning locally, but doesn't appear in logs.. Man, it would be nice to have some proper python logging for this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222173904 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " + "dtype %s doesn't match the arrow type %s " + "of defined return type %s" % (arrow_type_of_result, result.dtype, + arrow_return_type, return_type), + file=sys.stderr) +except: +print("WARN: Can't infer arrow type of Pandas.Series's dtype: %s, which might not " + "match the arrow type %s of defined return type %s" % (result.dtype, + arrow_return_type, + return_type), --- End diff -- ok. thanks. :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222173421 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " + "dtype %s doesn't match the arrow type %s " + "of defined return type %s" % (arrow_type_of_result, result.dtype, + arrow_return_type, return_type), + file=sys.stderr) +except: +print("WARN: Can't infer arrow type of Pandas.Series's dtype: %s, which might not " + "match the arrow type %s of defined return type %s" % (result.dtype, + arrow_return_type, + return_type), --- End diff -- I would fix the indentation here tho :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222015287 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- No, but as the other print usage in `worker.py`, I think this can be seen in the worker log? This is also useful when testing in pyspark shell. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222007837 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- Will this appear when being run in an executor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/22610 [WIP][SPARK-25461][PySpark][SQL] Print warning when return type of Pandas.Series mismatches the arrow return type of pandas udf ## What changes were proposed in this pull request? For Pandas UDFs, we get arrow type from defined Catalyst return data type of UDFs. We use this arrow type to do serialization of data. If the defined return data type doesn't match with actual return type of Pandas.Series returned by Pandas UDFs, it has a risk to return incorrect data from Python side. This WIP work proposes to check if returned Pandas.Series's dtype matches with defined return type of Pandas UDFs. Although we can disallow it by throwing an exception to let users know they might need to set correct return type. But looks like we leverage such behavior in current codebase. For example, there is a test `test_vectorized_udf_null_short`: ```python data = [(None,), (2,), (3,), (4,)] schema = StructType().add("short", ShortType()) df = self.spark.createDataFrame(data, schema) short_f = pandas_udf(lambda x: x, ShortType()) res = df.select(short_f(col('short'))) self.assertEquals(df.collect(), res.collect()) ``` So instead, this work for now just prints warning message if such mismatching is detected. So users can read this message when debugging that their Pandas UDFs don't produce expected results. ## How was this patch tested? Manually test by running: ```python from pyspark.sql.functions import pandas_udf import pandas as pd values = [1.0] * 5 + [2.0] * 5 pdf = pd.DataFrame({'A': values}) df = spark.createDataFrame(pdf) @pandas_udf(returnType=BooleanType()) def to_boolean(column): return column df.select(['A']).withColumn('to_boolean', to_boolean('A')).show() ``` Output: ``` WARN: Arrow type double of return Pandas.Series of the user-defined function's dtype float64 doesn't match the arrow type bool of defined return type B ooleanType +---+--+ | A|to_boolean| +---+--+ |1.0| false| |1.0| false| |1.0| false| |1.0| false| |1.0| false| |2.0| false| |2.0| false| |2.0| false| |2.0| false| |2.0| false| +---+--+ ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-25461 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22610.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22610 commit 2fa15bda48ba64a102f114dc9119cb3c310200c4 Author: Liang-Chi Hsieh Date: 2018-09-26T09:01:40Z Ensure return type of Pandas.Series matches the arrow return type of pandas udf. commit d206b7cf78f898e622f539a15e45515fcbd9e54a Author: Liang-Chi Hsieh Date: 2018-10-02T05:29:44Z Print warning message instead of throwing exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org