HyukjinKwon opened a new pull request #24752: [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files URL: https://github.com/apache/spark/pull/24752 ## What changes were proposed in this pull request? This PR targets to add an integrated test base for various UDF test cases so that Scalar UDF, Python UDF and Scalar Pandas UDFs can be tested in SBT & Maven tests. ### Problem One of the problems we face is that: `ExtractPythonUDF[s|FromAggregate]` has unevaluable expressions that always has to be wrapped with special plans. This special rule seems producing many issues, for instance, SPARK-27803, SPARK-26147, SPARK-26864, SPARK-26293, SPARK-25314 and SPARK-24721. ### Why do we have less test cases dedicated for SQL and plans? We don't have such SQL (or plan) dedicated tests in PySpark to catch such issues because: - A developer should know both SQL, PySpark, Py4J and version differences in Python to write such good test cases - To test plans, we should access to plans in JVM via Py4J which is tricky, messy and duplicates JVM test cases - Usually we just add end-to-end test cases in PySpark therefore there are not so many examples to refer It is non-trivial overhead to switch test base and method (IMHO). ### How does this PR fix? This PR adds Python UDF and Scalar Pandas UDF in runtime of SBT / Maven test cases. It generates Python-pickled instance (consisting of return type and Python native function) that is used in Python or Scalar Pandas UDF and directly brings into JVM. After that, we don't interact via Py4J anymore but run the tests directly in JVM - we can just register and run Python UDF and Scalar Pandas UDF in JVM. Currently, I only integrated this change into SQL file based testing. This is how works with `udf-*.sql` files: After the test files starting with `udf-*.sql` are detected, it creates three test cases: - Scala UDF test case with a Scalar UDF registered named 'udf'. - Python UDF test case with a Python UDF registered named 'udf' iff Python executable and pyspark are available. - Scalar Pandas UDF test case with a Scalar Pandas UDF registered named 'udf' iff Python executable, pandas, pyspark and pyarrow are available. Therefore, UDF test cases should have single input and output files but executed by three different types of UDFs. For instance, ```sql CREATE TEMPORARY VIEW ta AS SELECT udf(a) AS a, udf('a') AS tag FROM t1 UNION ALL SELECT udf(a) AS a, udf('b') AS tag FROM t2; CREATE TEMPORARY VIEW tb AS SELECT udf(a) AS a, udf('a') AS tag FROM t3 UNION ALL SELECT udf(a) AS a, udf('b') AS tag FROM t4; SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag; ``` will be ran 3 times with Scalar UDF, Python UDF and Scalar Pandas UDF each. ### Appendix Plus, this PR adds `IntegratedUDFTestUtils` which enables to test and execute Python UDF and Scalar Pandas UDFs as below: To register Python UDF in SQL: ```scala IntegratedUDFTestUtils.registerTestUDF(new TestPythonUDF, spark) ``` To register Scalar Pandas UDF in SQL: ```scala IntegratedUDFTestUtils.registerTestUDF(new TestPythonUDF, spark) ``` To use it in Scala API: ```scala spark.select(expr("udf(1)").show() ``` To use it in SQL: ```scala sql("SELECT udf(1)").show() ``` This util could be used in the future for better coverage with Scala API combinations as well. ## How was this patch tested? Tested via the command below: ```bash build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/udf-inner-join.sql" ``` ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (5 seconds, 47 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF (4 seconds, 335 milliseconds) [info] - udf/udf-inner-join.sql - Scalar Pandas UDF (5 seconds, 423 milliseconds) ``` [python] unavailable: ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 577 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF is skipped because [pyton] and/or pyspark were not available. !!! IGNORED !!! [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [pyton]. !!! IGNORED !!! ``` pyspark unavailable: ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 991 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF is skipped because [python] and/or pyspark were not available. !!! IGNORED !!! [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pyspark,pandas and/or pyarrow were not available in [python]. !!! IGNORED !!! ``` pandas and/or pyarrow unavailable: ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 713 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF (3 seconds, 89 milliseconds) [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because pandas and/or pyarrow were not available in [python]. !!! IGNORED !!! ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
