HyukjinKwon opened a new pull request #25132: [SPARK-28359][SQL][PYTHON][TESTS] 
Make integrated UDF tests robust by making them no-op
URL: https://github.com/apache/spark/pull/25132
 
 
   **This alternative was abandoned  for two reasons below:**
   
   1. Seems Pyrolite doesn't look it guarantees floats to be corrected after 
the roundtrip between PVM and JVM. 
   
   2. There seems a bug in Arrow conversions for decimal precision and scale:
   
   ```
   java.lang.AssertionError: assertion failed: Invalid schema from pandas_udf: 
expected DecimalType(38,0), got DecimalType(10,0)
        at scala.Predef$.assert(Predef.scala:223)
        at 
org.apache.spark.sql.execution.python.ArrowEvalPythonExec.$anonfun$evaluate$2(ArrowEvalPythonExec.scala:92)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   ```
   
   ## What changes were proposed in this pull request?
   
   This is another alternative take comparing to 
https://github.com/apache/spark/pull/25130.
   Current UDFs available in `IntegratedUDFTestUtils` are not exactly no-op. It 
converts input column to strings and outputs to strings.
   
   It causes some issues when we convert and port the tests at SPARK-27921. 
Integrated UDF test cases share one output file and it should outputs the same. 
However,
   
   1. Special values are converted into strings differently:
   
       | Scala      | Python |
       | ---------- | ------ |
       | `null`     | `None` |
       | `Infinity` | `inf`  |
       | `-Infinity`| `-inf` |
       | `NaN`      | `nan`  |
   
   2. Due to float limitation at Python (see 
https://docs.python.org/3/tutorial/floatingpoint.html), if float is passed into 
Python and sent back to JVM, the values are potentially not exactly correct. 
See https://github.com/apache/spark/pull/25128 and 
https://github.com/apache/spark/pull/25110
   
   To work around this, this PR targets to change the UDF that returns as are 
(input column to output column). In this way, we can also handle complex type 
like map, array or structs too; however, it's too hacky and a bit overkill.
   
   
   **Before:**
   
   ```
   JVM (col1) -> (cast to string within Python) Python (string) -> (string) JVM
   ```
   
   **After:**
   
   ```
   JVM (col1) -> (col) Python (col's type) -> (col's type) JVM
   ```
   
   However, in this way, it requires to launch an external Python process to 
generate Python functions for each return type everytime it evaluates. Without 
caching, it increases the testing time twoice. So, I had to add a cache to keep 
the testing time almost same. However, now it looks pretty complicated and a 
bit. overkill.
   
   
   In this way, UDF is almost completely no-op.
   
   There's no diff in terms of output comparing to 
https://github.com/apache/spark/pull/25130
   
   <details><summary>Diff comparing to the PR 25180</summary>
   <p>
   
   ```diff
   diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
   index a2f64717d73..3360350f782 100644
   --- 
a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
   +++ 
b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out
   @@ -77,7 +77,7 @@ struct<max_324_78:float>
    -- !query 9
    SELECT stddev_pop(udf(b)) FROM aggtest
    -- !query 9 schema
   -struct<stddev_pop(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS 
DOUBLE)):double>
   +struct<stddev_pop(CAST(udf(b) AS DOUBLE)):double>
    -- !query 9 output
    131.10703231895047
   
   @@ -85,7 +85,7 @@ struct<stddev_pop(CAST(CAST(udf(cast(b as string)) AS 
FLOAT) AS DOUBLE)):double>
    -- !query 10
    SELECT udf(stddev_samp(b)) FROM aggtest
    -- !query 10 schema
   -struct<CAST(udf(cast(stddev_samp(cast(b as double)) as string)) AS 
DOUBLE):double>
   +struct<udf(stddev_samp(cast(b as double))):double>
    -- !query 10 output
    151.38936080399804
   
   @@ -93,7 +93,7 @@ struct<CAST(udf(cast(stddev_samp(cast(b as double)) as 
string)) AS DOUBLE):doubl
    -- !query 11
    SELECT var_pop(udf(b)) FROM aggtest
    -- !query 11 schema
   -struct<var_pop(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS 
DOUBLE)):double>
   +struct<var_pop(CAST(udf(b) AS DOUBLE)):double>
    -- !query 11 output
    17189.053923482323
   
   @@ -101,7 +101,7 @@ struct<var_pop(CAST(CAST(udf(cast(b as string)) AS 
FLOAT) AS DOUBLE)):double>
    -- !query 12
    SELECT udf(var_samp(b)) FROM aggtest
    -- !query 12 schema
   -struct<CAST(udf(cast(var_samp(cast(b as double)) as string)) AS 
DOUBLE):double>
   +struct<udf(var_samp(cast(b as double))):double>
    -- !query 12 output
    22918.738564643096
   
   @@ -109,7 +109,7 @@ struct<CAST(udf(cast(var_samp(cast(b as double)) as 
string)) AS DOUBLE):double>
    -- !query 13
    SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
    -- !query 13 schema
   -struct<CAST(udf(cast(stddev_pop(cast(cast(b as decimal(38,0)) as double)) 
as string)) AS DOUBLE):double>
   +struct<udf(stddev_pop(cast(cast(b as decimal(38,0)) as double))):double>
    -- !query 13 output
    131.18117242958306
   
   @@ -117,7 +117,7 @@ struct<CAST(udf(cast(stddev_pop(cast(cast(b as 
decimal(38,0)) as double)) as str
    -- !query 14
    SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest
    -- !query 14 schema
   -struct<stddev_samp(CAST(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS 
DECIMAL(38,0)) AS DOUBLE)):double>
   +struct<stddev_samp(CAST(CAST(udf(b) AS DECIMAL(38,0)) AS DOUBLE)):double>
    -- !query 14 output
    151.47497042966097
   
   @@ -125,7 +125,7 @@ struct<stddev_samp(CAST(CAST(CAST(udf(cast(b as string)) 
AS FLOAT) AS DECIMAL(38
    -- !query 15
    SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest
    -- !query 15 schema
   -struct<CAST(udf(cast(var_pop(cast(cast(b as decimal(38,0)) as double)) as 
string)) AS DOUBLE):double>
   +struct<udf(var_pop(cast(cast(b as decimal(38,0)) as double))):double>
    -- !query 15 output
    17208.5
   
   @@ -133,7 +133,7 @@ struct<CAST(udf(cast(var_pop(cast(cast(b as 
decimal(38,0)) as double)) as string
    -- !query 16
    SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest
    -- !query 16 schema
   -struct<var_samp(CAST(CAST(udf(cast(cast(b as decimal(38,0)) as string)) AS 
DECIMAL(38,0)) AS DOUBLE)):double>
   +struct<var_samp(CAST(udf(cast(b as decimal(38,0))) AS DOUBLE)):double>
    -- !query 16 output
    22944.666666666668
   
   @@ -141,7 +141,7 @@ struct<var_samp(CAST(CAST(udf(cast(cast(b as 
decimal(38,0)) as string)) AS DECIM
    -- !query 17
    SELECT udf(var_pop(1.0)), var_samp(udf(2.0))
    -- !query 17 schema
   -struct<CAST(udf(cast(var_pop(cast(1.0 as double)) as string)) AS 
DOUBLE):double,var_samp(CAST(CAST(udf(cast(2.0 as string)) AS DECIMAL(2,1)) AS 
DOUBLE)):double>
   +struct<udf(var_pop(cast(1.0 as double))):double,var_samp(CAST(udf(2.0) AS 
DOUBLE)):double>
    -- !query 17 output
    0.0    NaN
   
   @@ -149,7 +149,7 @@ struct<CAST(udf(cast(var_pop(cast(1.0 as double)) as 
string)) AS DOUBLE):double,
    -- !query 18
    SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), 
stddev_samp(CAST(udf(4.0) AS Decimal(38,0)))
    -- !query 18 schema
   -struct<stddev_pop(CAST(CAST(udf(cast(cast(3.0 as decimal(38,0)) as string)) 
AS DECIMAL(38,0)) AS DOUBLE)):double,stddev_samp(CAST(CAST(CAST(udf(cast(4.0 as 
string)) AS DECIMAL(2,1)) AS DECIMAL(38,0)) AS DOUBLE)):double>
   +struct<stddev_pop(CAST(udf(cast(3.0 as decimal(38,0))) AS 
DOUBLE)):double,stddev_samp(CAST(CAST(udf(4.0) AS DECIMAL(38,0)) AS 
DOUBLE)):double>
    -- !query 18 output
    0.0    NaN
   
   @@ -157,7 +157,7 @@ struct<stddev_pop(CAST(CAST(udf(cast(cast(3.0 as 
decimal(38,0)) as string)) AS D
    -- !query 19
    select sum(udf(CAST(null AS int))) from range(1,4)
    -- !query 19 schema
   -struct<sum(CAST(udf(cast(cast(null as int) as string)) AS INT)):bigint>
   +struct<sum(udf(cast(null as int))):bigint>
    -- !query 19 output
    NULL
   
   @@ -165,7 +165,7 @@ NULL
    -- !query 20
    select sum(udf(CAST(null AS long))) from range(1,4)
    -- !query 20 schema
   -struct<sum(CAST(udf(cast(cast(null as bigint) as string)) AS 
BIGINT)):bigint>
   +struct<sum(udf(cast(null as bigint))):bigint>
    -- !query 20 output
    NULL
   
   @@ -173,7 +173,7 @@ NULL
    -- !query 21
    select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
    -- !query 21 schema
   -struct<sum(CAST(udf(cast(cast(null as decimal(38,0)) as string)) AS 
DECIMAL(38,0))):decimal(38,0)>
   +struct<sum(udf(cast(null as decimal(38,0)))):decimal(38,0)>
    -- !query 21 output
    NULL
   
   @@ -181,7 +181,7 @@ NULL
    -- !query 22
    select sum(udf(CAST(null AS DOUBLE))) from range(1,4)
    -- !query 22 schema
   -struct<sum(CAST(udf(cast(cast(null as double) as string)) AS 
DOUBLE)):double>
   +struct<sum(udf(cast(null as double))):double>
    -- !query 22 output
    NULL
   
   @@ -189,7 +189,7 @@ NULL
    -- !query 23
    select avg(udf(CAST(null AS int))) from range(1,4)
    -- !query 23 schema
   -struct<avg(CAST(udf(cast(cast(null as int) as string)) AS INT)):double>
   +struct<avg(udf(cast(null as int))):double>
    -- !query 23 output
    NULL
   
   @@ -197,7 +197,7 @@ NULL
    -- !query 24
    select avg(udf(CAST(null AS long))) from range(1,4)
    -- !query 24 schema
   -struct<avg(CAST(udf(cast(cast(null as bigint) as string)) AS 
BIGINT)):double>
   +struct<avg(udf(cast(null as bigint))):double>
    -- !query 24 output
    NULL
   
   @@ -205,7 +205,7 @@ NULL
    -- !query 25
    select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4)
    -- !query 25 schema
   -struct<avg(CAST(udf(cast(cast(null as decimal(38,0)) as string)) AS 
DECIMAL(38,0))):decimal(38,4)>
   +struct<avg(udf(cast(null as decimal(38,0)))):decimal(38,4)>
    -- !query 25 output
    NULL
   
   @@ -213,7 +213,7 @@ NULL
    -- !query 26
    select avg(udf(CAST(null AS DOUBLE))) from range(1,4)
    -- !query 26 schema
   -struct<avg(CAST(udf(cast(cast(null as double) as string)) AS 
DOUBLE)):double>
   +struct<avg(udf(cast(null as double))):double>
    -- !query 26 output
    NULL
   
   @@ -221,7 +221,7 @@ NULL
    -- !query 27
    select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
    -- !query 27 schema
   -struct<sum(CAST(CAST(udf(cast(NaN as string)) AS STRING) AS DOUBLE)):double>
   +struct<sum(CAST(udf(NaN) AS DOUBLE)):double>
    -- !query 27 output
    NaN
   
   @@ -229,7 +229,7 @@ NaN
    -- !query 28
    select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4)
    -- !query 28 schema
   -struct<avg(CAST(CAST(udf(cast(NaN as string)) AS STRING) AS DOUBLE)):double>
   +struct<avg(CAST(udf(NaN) AS DOUBLE)):double>
    -- !query 28 output
    NaN
   
   @@ -238,7 +238,7 @@ NaN
    SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
    FROM (VALUES ('Infinity'), ('1')) v(x)
    -- !query 29 schema
   -struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS 
DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS 
DOUBLE)):double>
   +struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS 
DOUBLE)):double>
    -- !query 29 output
    Infinity       NaN
   
   @@ -247,7 +247,7 @@ Infinity    NaN
    SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
    FROM (VALUES ('Infinity'), ('Infinity')) v(x)
    -- !query 30 schema
   -struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS 
DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS 
DOUBLE)):double>
   +struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS 
DOUBLE)):double>
    -- !query 30 output
    Infinity       NaN
   
   @@ -256,7 +256,7 @@ Infinity    NaN
    SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE))
    FROM (VALUES ('-Infinity'), ('Infinity')) v(x)
    -- !query 31 schema
   -struct<avg(CAST(CAST(udf(cast(x as string)) AS STRING) AS 
DOUBLE)):double,var_pop(CAST(CAST(udf(cast(x as string)) AS STRING) AS 
DOUBLE)):double>
   +struct<avg(CAST(udf(x) AS DOUBLE)):double,var_pop(CAST(udf(x) AS 
DOUBLE)):double>
    -- !query 31 output
    NaN    NaN
   
   @@ -265,7 +265,7 @@ NaN NaN
    SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
    FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x)
    -- !query 32 schema
   -struct<avg(CAST(udf(cast(cast(x as double) as string)) AS 
DOUBLE)):double,CAST(udf(cast(var_pop(cast(x as double)) as string)) AS 
DOUBLE):double>
   +struct<avg(udf(cast(x as double))):double,udf(var_pop(cast(x as 
double))):double>
    -- !query 32 output
    1.00000005E8   2.5
   
   @@ -274,7 +274,7 @@ struct<avg(CAST(udf(cast(cast(x as double) as string)) 
AS DOUBLE)):double,CAST(u
    SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE)))
    FROM (VALUES (7000000000005), (7000000000007)) v(x)
    -- !query 33 schema
   -struct<avg(CAST(udf(cast(cast(x as double) as string)) AS 
DOUBLE)):double,CAST(udf(cast(var_pop(cast(x as double)) as string)) AS 
DOUBLE):double>
   +struct<avg(udf(cast(x as double))):double,udf(var_pop(cast(x as 
double))):double>
    -- !query 33 output
    7.000000000006E12      1.0
   
   @@ -282,7 +282,7 @@ struct<avg(CAST(udf(cast(cast(x as double) as string)) 
AS DOUBLE)):double,CAST(u
    -- !query 34
    SELECT udf(covar_pop(b, udf(a))), covar_samp(udf(b), a) FROM aggtest
    -- !query 34 schema
   -struct<CAST(udf(cast(covar_pop(cast(b as double), cast(cast(udf(cast(a as 
string)) as int) as double)) as string)) AS 
DOUBLE):double,covar_samp(CAST(CAST(udf(cast(b as string)) AS FLOAT) AS 
DOUBLE), CAST(a AS DOUBLE)):double>
   +struct<udf(covar_pop(cast(b as double), cast(udf(a) as 
double))):double,covar_samp(CAST(udf(b) AS DOUBLE), CAST(a AS DOUBLE)):double>
    -- !query 34 output
    653.6289553875104      871.5052738500139
   
   @@ -290,7 +290,7 @@ struct<CAST(udf(cast(covar_pop(cast(b as double), 
cast(cast(udf(cast(a as string
    -- !query 35
    SELECT corr(b, udf(a)) FROM aggtest
    -- !query 35 schema
   -struct<corr(CAST(b AS DOUBLE), CAST(CAST(udf(cast(a as string)) AS INT) AS 
DOUBLE)):double>
   +struct<corr(CAST(b AS DOUBLE), CAST(udf(a) AS DOUBLE)):double>
    -- !query 35 output
    0.1396345165178734
   
   @@ -315,7 +315,7 @@ struct<cnt_4:bigint>
    select ten, udf(count(*)), sum(udf(four)) from onek
    group by ten order by ten
    -- !query 38 schema
   -struct<ten:int,CAST(udf(cast(count(1) as string)) AS 
BIGINT):bigint,sum(CAST(udf(cast(four as string)) AS INT)):bigint>
   +struct<ten:int,udf(count(1)):bigint,sum(udf(four)):bigint>
    -- !query 38 output
    0      100     100
    1      100     200
   @@ -333,7 +333,7 @@ struct<ten:int,CAST(udf(cast(count(1) as string)) AS 
BIGINT):bigint,sum(CAST(udf
    select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek
    group by ten order by ten
    -- !query 39 schema
   -struct<ten:int,count(CAST(udf(cast(four as string)) AS 
INT)):bigint,CAST(udf(cast(sum(distinct cast(four as bigint)) as string)) AS 
BIGINT):bigint>
   +struct<ten:int,count(udf(four)):bigint,udf(sum(distinct cast(four as 
bigint))):bigint>
    -- !query 39 output
    0      100     2
    1      100     4
   @@ -352,7 +352,7 @@ select ten, udf(sum(distinct four)) from onek a
    group by ten
    having exists (select 1 from onek b where udf(sum(distinct a.four)) = 
b.four)
    -- !query 40 schema
   -struct<ten:int,CAST(udf(cast(sum(distinct cast(four as bigint)) as string)) 
AS BIGINT):bigint>
   +struct<ten:int,udf(sum(distinct cast(four as bigint))):bigint>
    -- !query 40 output
    0      2
    2      2
   @@ -372,7 +372,7 @@ struct<>
    org.apache.spark.sql.AnalysisException
   
    Aggregate/Window/Generate expressions are not valid in where clause of the 
query.
   -Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS 
BIGINT)) = CAST(CAST(udf(cast(four as string)) AS INT) AS BIGINT))]
   +Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS 
BIGINT)) = CAST(udf(four) AS BIGINT))]
    Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))];
   ```
   
   </p>
   </details>
   
   ## How was this patch tested?
   
   Manually tested.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to