This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 14f2bae208c [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, 
COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT`
14f2bae208c is described below

commit 14f2bae208c093dea58e3f947fb660e8345fb256
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Thu Jul 7 09:55:45 2022 +0800

    [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR 
in `H2Dialect` if them with `DISTINCT`
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/35145 compile COVAR_POP, COVAR_SAMP 
and CORR in H2Dialect.
    Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with 
DISTINCT.
    So https://github.com/apache/spark/pull/35145 introduces a bug that compile 
COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT.
    
    ### Why are the changes needed?
    Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate 
functions with DISTINCT.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    Bug will be fix.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #37090 from beliefer/SPARK-37527_followup2.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      | 15 ++++------
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 34 +++++++++++++++-------
 2 files changed, 30 insertions(+), 19 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 124cb001b5c..5dfc64d7b6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -62,18 +62,15 @@ private[sql] object H2Dialect extends JdbcDialect {
           assert(f.children().length == 1)
           val distinct = if (f.isDistinct) "DISTINCT " else ""
           Some(s"STDDEV_SAMP($distinct${f.children().head})")
-        case f: GeneralAggregateFunc if f.name() == "COVAR_POP" =>
+        case f: GeneralAggregateFunc if f.name() == "COVAR_POP" && 
!f.isDistinct =>
           assert(f.children().length == 2)
-          val distinct = if (f.isDistinct) "DISTINCT " else ""
-          Some(s"COVAR_POP($distinct${f.children().head}, 
${f.children().last})")
-        case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" =>
+          Some(s"COVAR_POP(${f.children().head}, ${f.children().last})")
+        case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" && 
!f.isDistinct =>
           assert(f.children().length == 2)
-          val distinct = if (f.isDistinct) "DISTINCT " else ""
-          Some(s"COVAR_SAMP($distinct${f.children().head}, 
${f.children().last})")
-        case f: GeneralAggregateFunc if f.name() == "CORR" =>
+          Some(s"COVAR_SAMP(${f.children().head}, ${f.children().last})")
+        case f: GeneralAggregateFunc if f.name() == "CORR" && !f.isDistinct =>
           assert(f.children().length == 2)
-          val distinct = if (f.isDistinct) "DISTINCT " else ""
-          Some(s"CORR($distinct${f.children().head}, ${f.children().last})")
+          Some(s"CORR(${f.children().head}, ${f.children().last})")
         case _ => None
       }
     )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 108348fbcd3..0a713bdb76c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -1652,23 +1652,37 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
   }
 
   test("scan with aggregate push-down: COVAR_POP COVAR_SAMP with filter and 
group by") {
-    val df = sql("SELECT COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
+    val df1 = sql("SELECT COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
       " FROM h2.test.employee WHERE dept > 0 GROUP BY DePt")
-    checkFiltersRemoved(df)
-    checkAggregateRemoved(df)
-    checkPushedInfo(df, "PushedAggregates: [COVAR_POP(BONUS, BONUS), 
COVAR_SAMP(BONUS, BONUS)], " +
+    checkFiltersRemoved(df1)
+    checkAggregateRemoved(df1)
+    checkPushedInfo(df1, "PushedAggregates: [COVAR_POP(BONUS, BONUS), 
COVAR_SAMP(BONUS, BONUS)], " +
       "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByExpressions: 
[DEPT]")
-    checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
+    checkAnswer(df1, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, 
null)))
+
+    val df2 = sql("SELECT COVAR_POP(DISTINCT bonus, bonus), 
COVAR_SAMP(DISTINCT bonus, bonus)" +
+      " FROM h2.test.employee WHERE dept > 0 GROUP BY DePt")
+    checkFiltersRemoved(df2)
+    checkAggregateRemoved(df2, false)
+    checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0]")
+    checkAnswer(df2, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, 
null)))
   }
 
   test("scan with aggregate push-down: CORR with filter and group by") {
-    val df = sql("SELECT CORR(bonus, bonus) FROM h2.test.employee WHERE dept > 
0" +
+    val df1 = sql("SELECT CORR(bonus, bonus) FROM h2.test.employee WHERE dept 
> 0" +
       " GROUP BY DePt")
-    checkFiltersRemoved(df)
-    checkAggregateRemoved(df)
-    checkPushedInfo(df, "PushedAggregates: [CORR(BONUS, BONUS)], " +
+    checkFiltersRemoved(df1)
+    checkAggregateRemoved(df1)
+    checkPushedInfo(df1, "PushedAggregates: [CORR(BONUS, BONUS)], " +
       "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByExpressions: 
[DEPT]")
-    checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
+    checkAnswer(df1, Seq(Row(1d), Row(1d), Row(null)))
+
+    val df2 = sql("SELECT CORR(DISTINCT bonus, bonus) FROM h2.test.employee 
WHERE dept > 0" +
+      " GROUP BY DePt")
+    checkFiltersRemoved(df2)
+    checkAggregateRemoved(df2, false)
+    checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0]")
+    checkAnswer(df2, Seq(Row(1d), Row(1d), Row(null)))
   }
 
   test("scan with aggregate push-down: aggregate over alias push down") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to