HyukjinKwon opened a new pull request #24752: [SPARK-27893][SQL][PYTHON] Create 
an integrated test base for Python, Scalar Pandas, Scala UDF by sql files
URL: https://github.com/apache/spark/pull/24752
 
 
   ## What changes were proposed in this pull request?
   
   This PR targets to add an integrated test base for various UDF test cases so 
that Scalar UDF, Python UDF and Scalar Pandas UDFs can be tested in SBT & Maven 
tests.
   
   ### Problem
   
   One of the problems we face is that: `ExtractPythonUDF[s|FromAggregate]` has 
unevaluable expressions that always has to be wrapped with special plans. This 
special rule seems producing many issues, for instance, SPARK-27803, 
SPARK-26147, SPARK-26864, SPARK-26293, SPARK-25314 and SPARK-24721.
   
   ### Why do we have less test cases dedicated for SQL and plans?
   
   We don't have such SQL (or plan) dedicated tests in PySpark to catch such 
issues because: 
     - A developer should know both SQL, PySpark, Py4J and version differences 
in Python to write such good test cases
     - To test plans, we should access to plans in JVM via Py4J which is 
tricky, messy and duplicates JVM test cases
     - Usually we just add end-to-end test cases in PySpark therefore there are 
not so many examples to refer
   
   It is non-trivial overhead to switch test base and method (IMHO).
   
   ### How does this PR fix?
   
   This PR adds Python UDF and Scalar Pandas UDF in runtime of SBT / Maven test 
cases. It generates Python-pickled instance (consisting of return type and 
Python native function) that is used in Python or Scalar Pandas UDF and 
directly brings into JVM.
   
   After that, we don't interact via Py4J anymore but run the tests directly in 
JVM - we can just register and run Python UDF and Scalar Pandas UDF in JVM.
   
   Currently, I only integrated this change into SQL file based testing. This 
is how works with `udf-*.sql` files:
   
   After the test files starting with `udf-*.sql` are detected, it creates 
three test cases:
     - Scala UDF test case with a Scalar UDF registered named 'udf'.
     - Python UDF test case with a Python UDF registered named 'udf' iff Python 
executable and pyspark are available.
     - Scalar Pandas UDF test case with a Scalar Pandas UDF registered named 
'udf' iff Python executable, pandas, pyspark and pyarrow are available.
   
   Therefore, UDF test cases should have single input and output files but 
executed by three different types of UDFs.
   
   For instance, 
   
   ```sql
   CREATE TEMPORARY VIEW ta AS
   SELECT udf(a) AS a, udf('a') AS tag FROM t1
   UNION ALL
   SELECT udf(a) AS a, udf('b') AS tag FROM t2;
   
   CREATE TEMPORARY VIEW tb AS
   SELECT udf(a) AS a, udf('a') AS tag FROM t3
   UNION ALL
   SELECT udf(a) AS a, udf('b') AS tag FROM t4;
   
   SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag;
   ```
   
   will be ran 3 times with Scalar UDF, Python UDF and Scalar Pandas UDF each.
   
   ### Appendix
   
   Plus, this PR adds `IntegratedUDFTestUtils` which enables to test and 
execute Python UDF and Scalar Pandas UDFs as below:
   
   To register Python UDF in SQL:
   
   ```scala
   IntegratedUDFTestUtils.registerTestUDF(new TestPythonUDF, spark)
   ```
   
   To register Scalar Pandas UDF in SQL:
   
   ```scala
   IntegratedUDFTestUtils.registerTestUDF(new TestPythonUDF, spark)
   ```
   
    To use it in Scala API:
   
   ```scala
   spark.select(expr("udf(1)").show()
   ```
   
    To use it in SQL:
   
   ```scala
   sql("SELECT udf(1)").show()
   ```
   
   This util could be used in the future for better coverage with Scala API 
combinations as well.
   
   ## How was this patch tested?
   
   Tested via the command below:
   
   ```bash
   build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/udf-inner-join.sql"
   ```
   
   ```
   [info] SQLQueryTestSuite:
   [info] - udf/udf-inner-join.sql - Scala UDF (5 seconds, 47 milliseconds)
   [info] - udf/udf-inner-join.sql - Python UDF (4 seconds, 335 milliseconds)
   [info] - udf/udf-inner-join.sql - Scalar Pandas UDF (5 seconds, 423 
milliseconds)
   ```
   
   [python] unavailable:
   
   ```
   [info] SQLQueryTestSuite:
   [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 577 milliseconds)
   [info] - udf/udf-inner-join.sql - Python UDF is skipped because [pyton] 
and/or pyspark were not available. !!! IGNORED !!!
   [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because 
pyspark,pandas and/or pyarrow were not available in [pyton]. !!! IGNORED !!!
   ```
   
   pyspark unavailable:
   
   ```
   [info] SQLQueryTestSuite:
   [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 991 milliseconds)
   [info] - udf/udf-inner-join.sql - Python UDF is skipped because [python] 
and/or pyspark were not available. !!! IGNORED !!!
   [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because 
pyspark,pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
   ```
   
   pandas and/or pyarrow unavailable:
   
   ```
   [info] SQLQueryTestSuite:
   [info] - udf/udf-inner-join.sql - Scala UDF (4 seconds, 713 milliseconds)
   [info] - udf/udf-inner-join.sql - Python UDF (3 seconds, 89 milliseconds)
   [info] - udf/udf-inner-join.sql - Scalar Pandas UDF is skipped because 
pandas and/or pyarrow were not available in [python]. !!! IGNORED !!!
   ```
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to