This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3165a95 [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas 3165a95 is described below commit 3165a95a04448546ae8955020566d718c6960223 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Sun Mar 29 13:59:18 2020 +0900 [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas ### What changes were proposed in this pull request? This PR proposes to make pandas function APIs (`groupby.(cogroup.)applyInPandas` and `mapInPandas`) to ignore Python type hints. ### Why are the changes needed? Python type hints are optional. It shouldn't affect where pandas UDFs are not used. This is also a future work for them to support other type hints. We shouldn't at least throw an exception at this moment. ### Does this PR introduce any user-facing change? No, it's master-only change. ```python import pandas as pd def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame: return pdf + 1 spark.range(10).groupby('id').applyInPandas(pandas_plus_one, schema="id long").show() ``` ```python import pandas as pd def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: return left + 1 spark.range(10).groupby('id').cogroup(spark.range(10).groupby("id")).applyInPandas(pandas_plus_one, schema="id long").show() ``` ```python from typing import Iterator import pandas as pd def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: return map(lambda v: v + 1, iter) spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show() ``` **Before:** Exception **After:** ``` +---+ | id| +---+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| +---+ ``` ### How was this patch tested? Closes #28052 from HyukjinKwon/SPARK-31287. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/functions.py | 8 +++++ .../pyspark/sql/tests/test_pandas_udf_typehints.py | 42 ++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/python/pyspark/sql/pandas/functions.py b/python/pyspark/sql/pandas/functions.py index 31aa321..f43ebf8 100644 --- a/python/pyspark/sql/pandas/functions.py +++ b/python/pyspark/sql/pandas/functions.py @@ -384,6 +384,14 @@ def _create_pandas_udf(f, returnType, evalType): "In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for " "pandas UDF instead of specifying pandas UDF type which will be deprecated " "in the future releases. See SPARK-28264 for more details.", UserWarning) + elif evalType in [PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, + PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF]: + # In case of 'SQL_GROUPED_MAP_PANDAS_UDF', deprecation warning is being triggered + # at `apply` instead. + # In case of 'SQL_MAP_PANDAS_ITER_UDF' and 'SQL_COGROUPED_MAP_PANDAS_UDF', the + # evaluation type will always be set. + pass elif len(argspec.annotations) > 0: evalType = infer_eval_type(signature(f)) assert evalType is not None diff --git a/python/pyspark/sql/tests/test_pandas_udf_typehints.py b/python/pyspark/sql/tests/test_pandas_udf_typehints.py index 7c83c78..2582080 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_typehints.py +++ b/python/pyspark/sql/tests/test_pandas_udf_typehints.py @@ -261,6 +261,48 @@ class PandasUDFTypeHintsTests(ReusedSQLTestCase): expected = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort('id') assert_frame_equal(expected.toPandas(), actual.toPandas()) + def test_ignore_type_hint_in_group_apply_in_pandas(self): + df = self.spark.range(10) + exec( + "def pandas_plus_one(v: pd.DataFrame) -> pd.DataFrame:\n" + " return v + 1", + self.local) + + pandas_plus_one = self.local["pandas_plus_one"] + + actual = df.groupby('id').applyInPandas(pandas_plus_one, schema=df.schema).sort('id') + expected = df.selectExpr("id + 1 as id") + assert_frame_equal(expected.toPandas(), actual.toPandas()) + + def test_ignore_type_hint_in_cogroup_apply_in_pandas(self): + df = self.spark.range(10) + exec( + "def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:\n" + " return left + 1", + self.local) + + pandas_plus_one = self.local["pandas_plus_one"] + + actual = df.groupby('id').cogroup( + self.spark.range(10).groupby("id") + ).applyInPandas(pandas_plus_one, schema=df.schema).sort('id') + expected = df.selectExpr("id + 1 as id") + assert_frame_equal(expected.toPandas(), actual.toPandas()) + + def test_ignore_type_hint_in_map_in_pandas(self): + df = self.spark.range(10) + exec( + "from typing import Iterator\n" + "def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:\n" + " return map(lambda v: v + 1, iter)", + self.local) + + pandas_plus_one = self.local["pandas_plus_one"] + + actual = df.mapInPandas(pandas_plus_one, schema=df.schema) + expected = df.selectExpr("id + 1 as id") + assert_frame_equal(expected.toPandas(), actual.toPandas()) + if __name__ == "__main__": from pyspark.sql.tests.test_pandas_udf_typehints import * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org