younggyuchun opened a new pull request #25195: [SPARK-28288][SQL][PYTHON][TESTS] Convert and port 'window.sql' into … URL: https://github.com/apache/spark/pull/25195 ## What changes were proposed in this pull request? This PR adds some tests converted from window.sql to test UDFs. Please see the contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). <details><summary>Diff comparing to 'window.sql'</summary> <p> ```diff ... diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out index 367dc4f513..e556d25f7d 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out @@ -21,10 +21,10 @@ struct<> -- !query 1 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData ORDER BY cate, val -- !query 1 schema -struct<val:int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND CURRENT ROW):bigint> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND CURRENT ROW):bigint> -- !query 1 output NULL NULL 0 3 NULL 1 @@ -38,10 +38,10 @@ NULL a 0 -- !query 2 -SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val +SELECT udf(val), cate, sum(val) OVER(PARTITION BY cate ORDER BY val ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 2 schema -struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING):bigint> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING):bigint> -- !query 2 output NULL NULL 3 3 NULL 3 @@ -55,20 +55,27 @@ NULL a 1 -- !query 3 -SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long -ROWS BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long +SELECT val_long, udf(cate), sum(val_long) OVER(PARTITION BY cate ORDER BY val_long +ROWS BETWEEN CURRENT ROW AND CAST(2147483648 AS int) FOLLOWING) FROM testData ORDER BY cate, val_long -- !query 3 schema -struct<> +struct<val_long:bigint,CAST(udf(cast(cate as string)) AS STRING):string,sum(val_long) OVER (PARTITION BY cate ORDER BY val_long ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND CAST(2147483648 AS INT) FOLLOWING):bigint> -- !query 3 output -org.apache.spark.sql.AnalysisException -cannot resolve 'ROWS BETWEEN CURRENT ROW AND 2147483648L FOLLOWING' due to data type mismatch: The data type of the upper bound 'bigint' does not match the expected data type 'int'.; line 1 pos 41 +NULL NULL 1 +1 NULL 1 +1 a 2147483654 +1 a 2147483653 +2 a 2147483652 +2147483650 a 2147483650 +NULL b 2147483653 +3 b 2147483653 +2147483650 b 2147483650 -- !query 4 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData ORDER BY cate, val -- !query 4 schema -struct<val:int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CURRENT ROW):bigint> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CURRENT ROW):bigint> -- !query 4 output NULL NULL 0 3 NULL 1 @@ -82,10 +89,10 @@ NULL a 0 -- !query 5 -SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val +SELECT val, udf(cate), sum(val) OVER(PARTITION BY cate ORDER BY val RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 5 schema -struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint> +struct<val:int,CAST(udf(cast(cate as string)) AS STRING):string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint> -- !query 5 output NULL NULL NULL 3 NULL 3 @@ -99,10 +106,10 @@ NULL a NULL -- !query 6 -SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long +SELECT val_long, udf(cate), sum(val_long) OVER(PARTITION BY cate ORDER BY val_long RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long -- !query 6 schema -struct<val_long:bigint,cate:string,sum(val_long) OVER (PARTITION BY cate ORDER BY val_long ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING):bigint> +struct<val_long:bigint,CAST(udf(cast(cate as string)) AS STRING):string,sum(val_long) OVER (PARTITION BY cate ORDER BY val_long ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING):bigint> -- !query 6 output NULL NULL NULL 1 NULL 1 @@ -116,10 +123,10 @@ NULL b NULL -- !query 7 -SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double +SELECT val_double, udf(cate), sum(val_double) OVER(PARTITION BY cate ORDER BY val_double RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double -- !query 7 schema -struct<val_double:double,cate:string,sum(val_double) OVER (PARTITION BY cate ORDER BY val_double ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2.5 AS DOUBLE) FOLLOWING):double> +struct<val_double:double,CAST(udf(cast(cate as string)) AS STRING):string,sum(val_double) OVER (PARTITION BY cate ORDER BY val_double ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2.5 AS DOUBLE) FOLLOWING):double> -- !query 7 output NULL NULL NULL 1.0 NULL 1.0 @@ -133,10 +140,10 @@ NULL NULL NULL -- !query 8 -SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date +SELECT val_date, udf(cate), max(val_date) OVER(PARTITION BY cate ORDER BY val_date RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date -- !query 8 schema -struct<val_date:date,cate:string,max(val_date) OVER (PARTITION BY cate ORDER BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING):date> +struct<val_date:date,CAST(udf(cast(cate as string)) AS STRING):string,max(val_date) OVER (PARTITION BY cate ORDER BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING):date> -- !query 8 output NULL NULL NULL 2017-08-01 NULL 2017-08-01 @@ -150,11 +157,11 @@ NULL NULL NULL -- !query 9 -SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp +SELECT val_timestamp, udf(cate), avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData ORDER BY cate, val_timestamp -- !query 9 schema -struct<val_timestamp:timestamp,cate:string,avg(CAST(val_timestamp AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND interval 3 weeks 2 days 4 hours FOLLOWING):double> +struct<val_timestamp:timestamp,CAST(udf(cast(cate as string)) AS STRING):string,avg(CAST(val_timestamp AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND interval 3 weeks 2 days 4 hours FOLLOWING):double> -- !query 9 output NULL NULL NULL 2017-07-31 17:00:00 NULL 1.5015456E9 @@ -168,10 +175,10 @@ NULL NULL NULL -- !query 10 -SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC +SELECT val, udf(cate), sum(val) OVER(PARTITION BY cate ORDER BY val DESC RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 10 schema -struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint> +struct<val:int,CAST(udf(cast(cate as string)) AS STRING):string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint> -- !query 10 output NULL NULL NULL 3 NULL 3 @@ -185,57 +192,57 @@ NULL a NULL -- !query 11 -SELECT val, cate, count(val) OVER(PARTITION BY cate -ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate +ROWS BETWEEN UNBOUNDED FOLLOWING AND CAST(1 as int) FOLLOWING) FROM testData ORDER BY cate, val -- !query 11 schema struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not follow the lower bound 'unboundedfollowing$()'.; line 1 pos 33 +cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND CAST(1 AS INT) FOLLOWING' due to data type mismatch: Window frame upper bound 'cast(1 as int)' does not follow the lower bound 'unboundedfollowing$()'.; line 1 pos 38 -- !query 12 -SELECT val, cate, count(val) OVER(PARTITION BY cate -RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate +RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, udf(val) -- !query 12 schema struct<> -- !query 12 output org.apache.spark.sql.AnalysisException -cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33 +cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 38 -- !query 13 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 13 schema struct<> -- !query 13 output org.apache.spark.sql.AnalysisException -cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33 +cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 38 -- !query 14 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 14 schema struct<> -- !query 14 output org.apache.spark.sql.AnalysisException -cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_timestamp() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'int' which is used in the range frame.; line 1 pos 33 +cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_timestamp() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'int' which is used in the range frame.; line 1 pos 38 -- !query 15 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val -- !query 15 schema struct<> -- !query 15 output org.apache.spark.sql.AnalysisException -cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33 +cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 38 -- !query 16 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val -- !query 16 schema struct<> @@ -245,13 +252,13 @@ org.apache.spark.sql.catalyst.parser.ParseException Frame bound value must be a literal.(line 2, pos 30) == SQL == -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val +SELECT udf(val), cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val ------------------------------^^^ -- !query 17 -SELECT val, cate, +SELECT udf(val), cate, max(val) OVER w AS max, min(val) OVER w AS min, min(val) OVER w AS min, @@ -286,7 +293,7 @@ FROM testData WINDOW w AS (PARTITION BY cate ORDER BY val) ORDER BY cate, val -- !query 17 schema -struct<val:int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint,covar_pop:double,corr:double,stddev_samp:double,stddev_pop:double,collect_list:array<int>,collect_set:array<int>,skewness:double,kurtosis:double> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint,covar_pop:double,corr:double,stddev_samp:double,stddev_pop:double,collect_list:array<int>,collect_set:array<int>,skewness:double,kurtosis:double> -- !query 17 output NULL NULL NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.5 0.0 1 1 NULL NULL 0 NULL NULL NULL NULL [] [] NULL NULL 3 NULL 3 3 3 1 3 3.0 NaN NULL 3 NULL 3 3 3 2 2 1.0 1.0 2 2 0.0 NaN 1 0.0 NaN NaN 0.0 [3] [3] NaN NaN @@ -300,9 +307,9 @@ NULL a NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.25 0. -- !query 18 -SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val +SELECT udf(val), cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val -- !query 18 schema -struct<val:int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double> -- !query 18 output NULL NULL NULL 3 NULL NULL @@ -316,18 +323,25 @@ NULL a NULL -- !query 19 -SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val +SELECT udf(val), cate, row_number() OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, udf(val) -- !query 19 schema -struct<> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,row_number() OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int> -- !query 19 output -org.apache.spark.sql.AnalysisException -Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table; +NULL NULL 1 +3 NULL 2 +NULL a 1 +1 a 2 +1 a 3 +2 a 4 +1 b 1 +2 b 2 +3 b 3 -- !query 20 -SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val +SELECT udf(val), cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val -- !query 20 schema -struct<val:int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double> -- !query 20 output NULL NULL 13 1.8571428571428572 3 NULL 13 1.8571428571428572 @@ -341,7 +355,7 @@ NULL a 13 1.8571428571428572 -- !query 21 -SELECT val, cate, +SELECT udf(val), cate, first_value(false) OVER w AS first_value, first_value(true, true) OVER w AS first_value_ignore_null, first_value(false, false) OVER w AS first_value_contain_null, @@ -352,7 +366,7 @@ FROM testData WINDOW w AS () ORDER BY cate, val -- !query 21 schema -struct<val:int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean> +struct<CAST(udf(cast(val as string)) AS INT):int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean> -- !query 21 output NULL NULL false true false false true false 3 NULL false true false false true false @@ -366,12 +380,12 @@ NULL a false true false false true false -- !query 22 -SELECT cate, sum(val) OVER (w) +SELECT udf(cate), sum(val) OVER (w) FROM testData WHERE val is not null WINDOW w AS (PARTITION BY cate ORDER BY val) -- !query 22 schema -struct<cate:string,sum(CAST(val AS BIGINT)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint> +struct<CAST(udf(cast(cate as string)) AS STRING):string,sum(CAST(val AS BIGINT)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint> -- !query 22 output NULL 3 a 2 ``` </p> </details> ## How was this patch tested? Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
