This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 14f2bae208c [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` 14f2bae208c is described below commit 14f2bae208c093dea58e3f947fb660e8345fb256 Author: Jiaan Geng <belie...@163.com> AuthorDate: Thu Jul 7 09:55:45 2022 +0800 [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So https://github.com/apache/spark/pull/35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Why are the changes needed? Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Does this PR introduce _any_ user-facing change? 'Yes'. Bug will be fix. ### How was this patch tested? New test cases. Closes #37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 15 ++++------ .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 34 +++++++++++++++------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 124cb001b5c..5dfc64d7b6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -62,18 +62,15 @@ private[sql] object H2Dialect extends JdbcDialect { assert(f.children().length == 1) val distinct = if (f.isDistinct) "DISTINCT " else "" Some(s"STDDEV_SAMP($distinct${f.children().head})") - case f: GeneralAggregateFunc if f.name() == "COVAR_POP" => + case f: GeneralAggregateFunc if f.name() == "COVAR_POP" && !f.isDistinct => assert(f.children().length == 2) - val distinct = if (f.isDistinct) "DISTINCT " else "" - Some(s"COVAR_POP($distinct${f.children().head}, ${f.children().last})") - case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" => + Some(s"COVAR_POP(${f.children().head}, ${f.children().last})") + case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" && !f.isDistinct => assert(f.children().length == 2) - val distinct = if (f.isDistinct) "DISTINCT " else "" - Some(s"COVAR_SAMP($distinct${f.children().head}, ${f.children().last})") - case f: GeneralAggregateFunc if f.name() == "CORR" => + Some(s"COVAR_SAMP(${f.children().head}, ${f.children().last})") + case f: GeneralAggregateFunc if f.name() == "CORR" && !f.isDistinct => assert(f.children().length == 2) - val distinct = if (f.isDistinct) "DISTINCT " else "" - Some(s"CORR($distinct${f.children().head}, ${f.children().last})") + Some(s"CORR(${f.children().head}, ${f.children().last})") case _ => None } ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 108348fbcd3..0a713bdb76c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -1652,23 +1652,37 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } test("scan with aggregate push-down: COVAR_POP COVAR_SAMP with filter and group by") { - val df = sql("SELECT COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" + + val df1 = sql("SELECT COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" + " FROM h2.test.employee WHERE dept > 0 GROUP BY DePt") - checkFiltersRemoved(df) - checkAggregateRemoved(df) - checkPushedInfo(df, "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " + + checkFiltersRemoved(df1) + checkAggregateRemoved(df1) + checkPushedInfo(df1, "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByExpressions: [DEPT]") - checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null))) + checkAnswer(df1, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null))) + + val df2 = sql("SELECT COVAR_POP(DISTINCT bonus, bonus), COVAR_SAMP(DISTINCT bonus, bonus)" + + " FROM h2.test.employee WHERE dept > 0 GROUP BY DePt") + checkFiltersRemoved(df2) + checkAggregateRemoved(df2, false) + checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0]") + checkAnswer(df2, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null))) } test("scan with aggregate push-down: CORR with filter and group by") { - val df = sql("SELECT CORR(bonus, bonus) FROM h2.test.employee WHERE dept > 0" + + val df1 = sql("SELECT CORR(bonus, bonus) FROM h2.test.employee WHERE dept > 0" + " GROUP BY DePt") - checkFiltersRemoved(df) - checkAggregateRemoved(df) - checkPushedInfo(df, "PushedAggregates: [CORR(BONUS, BONUS)], " + + checkFiltersRemoved(df1) + checkAggregateRemoved(df1) + checkPushedInfo(df1, "PushedAggregates: [CORR(BONUS, BONUS)], " + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByExpressions: [DEPT]") - checkAnswer(df, Seq(Row(1d), Row(1d), Row(null))) + checkAnswer(df1, Seq(Row(1d), Row(1d), Row(null))) + + val df2 = sql("SELECT CORR(DISTINCT bonus, bonus) FROM h2.test.employee WHERE dept > 0" + + " GROUP BY DePt") + checkFiltersRemoved(df2) + checkAggregateRemoved(df2, false) + checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0]") + checkAnswer(df2, Seq(Row(1d), Row(1d), Row(null))) } test("scan with aggregate push-down: aggregate over alias push down") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org