[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938796458


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")
+  .groupBy("my_dept", "my_manager").sum("SALARY")
+  .orderBy("my_dept", "my_manager")
+  .limit(1)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY"))
+  .orderBy(sum("SALARY"))
+  .limit(1)
+checkSortRemoved(df7, false)
+checkLimitRemoved(df7, false)
+checkPushedInfo(df7,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df7, Seq(Row(6, 12000.00)))
+
+val df8 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY").as("total"))
+  .orderBy("total")
+  .limit(1)
+checkSortRemoved(df8, false)
+checkLimitRemoved(df8, false)
+checkPushedInfo(df8,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df8, Seq(Row(6, 12000.00)))
+  }
+
+ 

[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938794884


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")
+  .groupBy("my_dept", "my_manager").sum("SALARY")
+  .orderBy("my_dept", "my_manager")
+  .limit(1)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY"))
+  .orderBy(sum("SALARY"))

Review Comment:
   We can translate agg expressions now, why this test still can't trigger 
top-n pushdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938794073


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")
+  .groupBy("my_dept", "my_manager").sum("SALARY")
+  .orderBy("my_dept", "my_manager")
+  .limit(1)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY"))
+  .orderBy(sum("SALARY"))

Review Comment:
   how about
   ```
   .groupBy("dept").agg(sum("SALARY").as("sum_salary"))
   .orderBy("sum_salary")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938792943


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("dept").sum("SALARY")
+  .orderBy($"dept".gt(1))
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT > 1 ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df4, Seq(Row(1, 19000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))

Review Comment:
   let's use case instead of case when, which is simpler and easier to review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938792433


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("dept").sum("SALARY")
+  .orderBy($"dept".gt(1))

Review Comment:
   let's use cast now. group by a predicate is super weird.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938791655


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")

Review Comment:
   let's add a cast instead of just a simple alias
   ```
   .select($"DEPT".cast("string").as("my_dept"), $"SALARY")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938792037


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))

Review Comment:
   we can remove this test after you address 
https://github.com/apache/spark/pull/37320/files#r938791655



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938790245


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -408,16 +411,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
   }
   (operation, isPushed && !isPartiallyPushed)
 case s @ Sort(order, _, operation @ ScanOperation(project, filter, 
sHolder: ScanBuilderHolder))
-// Without building the Scan, we do not know the resulting column 
names after aggregate
-// push-down, and thus can't push down Top-N which needs to know the 
ordering column names.
-// TODO: we can support simple cases like GROUP BY columns directly 
and ORDER BY the same
-//   columns, which we know the resulting column names: the 
original table columns.
-if sHolder.pushedAggregate.isEmpty && filter.isEmpty &&
-  CollapseProject.canCollapseExpressions(order, project, alwaysInline 
= true) =>
+  if filter.isEmpty &&
+CollapseProject.canCollapseExpressions(order, project, alwaysInline = 
true) =>
   val aliasMap = getAliasMap(project)
-  val newOrder = order.map(replaceAlias(_, 
aliasMap)).asInstanceOf[Seq[SortOrder]]
+  val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap))
+  val newOrder = if (sHolder.pushedAggregate.isDefined) {
+// Without building the Scan, Aggregate push-down give the expected 
output starts with
+// `group_col_` or `agg_func_`. When Aggregate push-down working with 
Sort for top n
+// push-down, we need replace these expected output with the origin 
expressions.

Review Comment:
   ```
   // `ScanBuilderHolder` has different output columns after aggregate 
pushdown. Here we
   // replace the attributes in ordering expressions with the original table 
output columns.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-05 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r938790445


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -408,16 +411,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
   }
   (operation, isPushed && !isPartiallyPushed)
 case s @ Sort(order, _, operation @ ScanOperation(project, filter, 
sHolder: ScanBuilderHolder))
-// Without building the Scan, we do not know the resulting column 
names after aggregate
-// push-down, and thus can't push down Top-N which needs to know the 
ordering column names.
-// TODO: we can support simple cases like GROUP BY columns directly 
and ORDER BY the same
-//   columns, which we know the resulting column names: the 
original table columns.
-if sHolder.pushedAggregate.isEmpty && filter.isEmpty &&
-  CollapseProject.canCollapseExpressions(order, project, alwaysInline 
= true) =>
+  if filter.isEmpty &&
+CollapseProject.canCollapseExpressions(order, project, alwaysInline = 
true) =>
   val aliasMap = getAliasMap(project)
-  val newOrder = order.map(replaceAlias(_, 
aliasMap)).asInstanceOf[Seq[SortOrder]]
+  val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap))
+  val newOrder = if (sHolder.pushedAggregate.isDefined) {
+// Without building the Scan, Aggregate push-down give the expected 
output starts with
+// `group_col_` or `agg_func_`. When Aggregate push-down working with 
Sort for top n
+// push-down, we need replace these expected output with the origin 
expressions.
+aliasReplacedOrder.map {
+  _.transform {
+case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a)
+  }.asInstanceOf[SortOrder]
+}
+  } else {
+aliasReplacedOrder.asInstanceOf[Seq[SortOrder]]
+  }
   val normalizedOrders = DataSourceStrategy.normalizeExprs(
 newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]]
+  // Because V2ExpressionBuilder can't translate aggregate functions, so 
we can't

Review Comment:
   we can remove this TODO now.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -408,16 +411,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
   }
   (operation, isPushed && !isPartiallyPushed)
 case s @ Sort(order, _, operation @ ScanOperation(project, filter, 
sHolder: ScanBuilderHolder))
-// Without building the Scan, we do not know the resulting column 
names after aggregate
-// push-down, and thus can't push down Top-N which needs to know the 
ordering column names.
-// TODO: we can support simple cases like GROUP BY columns directly 
and ORDER BY the same
-//   columns, which we know the resulting column names: the 
original table columns.
-if sHolder.pushedAggregate.isEmpty && filter.isEmpty &&
-  CollapseProject.canCollapseExpressions(order, project, alwaysInline 
= true) =>
+  if filter.isEmpty &&
+CollapseProject.canCollapseExpressions(order, project, alwaysInline = 
true) =>
   val aliasMap = getAliasMap(project)
-  val newOrder = order.map(replaceAlias(_, 
aliasMap)).asInstanceOf[Seq[SortOrder]]
+  val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap))
+  val newOrder = if (sHolder.pushedAggregate.isDefined) {
+// Without building the Scan, Aggregate push-down give the expected 
output starts with
+// `group_col_` or `agg_func_`. When Aggregate push-down working with 
Sort for top n
+// push-down, we need replace these expected output with the origin 
expressions.
+aliasReplacedOrder.map {
+  _.transform {
+case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a)
+  }.asInstanceOf[SortOrder]
+}
+  } else {
+aliasReplacedOrder.asInstanceOf[Seq[SortOrder]]
+  }
   val normalizedOrders = DataSourceStrategy.normalizeExprs(
 newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]]
+  // Because V2ExpressionBuilder can't translate aggregate functions, so 
we can't

Review Comment:
   and let's add tests for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935119522


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,255 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
+  val df4 = spark.read
+.table("h2.test.employee")
+.groupBy("dept").sum("SALARY")
+.orderBy($"dept" + 100)

Review Comment:
   BTW, can we make the grouping key an expression? like this 
https://github.com/apache/spark/pull/37320/files#diff-1496378d9e7817c45c962f1af48e5e765cb475bd01d58edec118d98225e02ef3R887



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935119295


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,255 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
+  val df4 = spark.read
+.table("h2.test.employee")
+.groupBy("dept").sum("SALARY")
+.orderBy($"dept" + 100)

Review Comment:
   can we use some functions that don't need ansi mode to simplify the test? 
e.g. `log(10, dept)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935064814


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")

Review Comment:
   can we also test `.orderBy($"key" + 100)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935064627


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")
+  .groupBy("my_dept", "my_manager").sum("SALARY")
+  .orderBy("my_dept", "my_manager")
+  .limit(1)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY"))
+  .orderBy(sum("SALARY"))
+  .limit(1)
+checkSortRemoved(df7, false)
+checkLimitRemoved(df7, false)
+checkPushedInfo(df7,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df7, Seq(Row(6, 12000.00)))
+
+val df8 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY").as("total"))
+  .orderBy("total")
+  .limit(1)
+checkSortRemoved(df8, false)
+checkLimitRemoved(df8, false)
+checkPushedInfo(df8,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df8, Seq(Row(6, 12000.00)))
+  }
+
+ 

[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935064235


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")
+  .groupBy("my_dept", "my_manager").sum("SALARY")
+  .orderBy("my_dept", "my_manager")
+  .limit(1)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY"))
+  .orderBy(sum("SALARY"))
+  .limit(1)
+checkSortRemoved(df7, false)
+checkLimitRemoved(df7, false)
+checkPushedInfo(df7,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df7, Seq(Row(6, 12000.00)))
+
+val df8 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY").as("total"))

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please 

[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935063965


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")
+  .groupBy("my_dept", "my_manager").sum("SALARY")
+  .orderBy("my_dept", "my_manager")
+  .limit(1)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT", $"SALARY")
+  .groupBy("dept").agg(sum("SALARY"))
+  .orderBy(sum("SALARY"))

Review Comment:
   can we add some comments to explain why top n can't be pushed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935063573


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"SALARY")
+  .groupBy("my_dept").sum("SALARY")
+  .orderBy("my_dept")
+  .limit(1)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY")

Review Comment:
   We already have tests to verify the alias, we don't need to test it again 
with 2 columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935061143


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -410,12 +413,21 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 case s @ Sort(order, _, operation @ ScanOperation(project, filter, 
sHolder: ScanBuilderHolder))
 // Without building the Scan, we do not know the resulting column 
names after aggregate
 // push-down, and thus can't push down Top-N which needs to know the 
ordering column names.
-// TODO: we can support simple cases like GROUP BY columns directly 
and ORDER BY the same
-//   columns, which we know the resulting column names: the 
original table columns.
-if sHolder.pushedAggregate.isEmpty && filter.isEmpty &&
+// In particular, we push down the simple cases like GROUP BY 
expressions directly and
+// ORDER BY the same expressions, which we know the original table 
columns.
+if filter.isEmpty &&
   CollapseProject.canCollapseExpressions(order, project, alwaysInline 
= true) =>
   val aliasMap = getAliasMap(project)
-  val newOrder = order.map(replaceAlias(_, 
aliasMap)).asInstanceOf[Seq[SortOrder]]
+  val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap))
+  val newOrder = if (sHolder.pushedAggregate.isDefined) {
+aliasReplacedOrder.map {

Review Comment:
   let's add some comments here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-08-01 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r935060915


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -410,12 +413,21 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 case s @ Sort(order, _, operation @ ScanOperation(project, filter, 
sHolder: ScanBuilderHolder))
 // Without building the Scan, we do not know the resulting column 
names after aggregate
 // push-down, and thus can't push down Top-N which needs to know the 
ordering column names.
-// TODO: we can support simple cases like GROUP BY columns directly 
and ORDER BY the same
-//   columns, which we know the resulting column names: the 
original table columns.
-if sHolder.pushedAggregate.isEmpty && filter.isEmpty &&
+// In particular, we push down the simple cases like GROUP BY 
expressions directly and
+// ORDER BY the same expressions, which we know the original table 
columns.

Review Comment:
   I think we can remove all these comments here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-07-29 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r933157882


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -811,6 +800,244 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = sql(
+  """
+|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY my_dept
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = sql(
+  """
+|SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM 
h2.test.employee
+|GROUP BY dept, my_manager
+|ORDER BY my_dept, my_manager
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = sql(
+  """
+|SELECT dept, SUM(SALARY) FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY SUM(SALARY)
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df7, false)
+checkLimitRemoved(df7, false)
+checkPushedInfo(df7,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df7, Seq(Row(6, 12000.00)))
+
+val df8 = sql(
+  """
+|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY total
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df8, false)
+checkLimitRemoved(df8, false)
+checkPushedInfo(df8,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df8, Seq(Row(6, 12000.00)))
+  }
+
+  

[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-07-29 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r933157287


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -811,6 +800,244 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = sql(
+  """
+|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY my_dept
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = sql(
+  """
+|SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM 
h2.test.employee
+|GROUP BY dept, my_manager
+|ORDER BY my_dept, my_manager
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = sql(
+  """
+|SELECT dept, SUM(SALARY) FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY SUM(SALARY)
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df7, false)
+checkLimitRemoved(df7, false)
+checkPushedInfo(df7,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df7, Seq(Row(6, 12000.00)))
+
+val df8 = sql(
+  """
+|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY total
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df8, false)
+checkLimitRemoved(df8, false)
+checkPushedInfo(df8,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df8, Seq(Row(6, 12000.00)))
+  }
+
+  

[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-07-28 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r932186273


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -811,6 +800,244 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df2, Seq(Row(2, "david", 1.00)))
   }
 
+  test("scan with aggregate push-down and top N push-down") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .orderBy("DEPT")
+  .limit(1)
+checkSortRemoved(df1)
+checkLimitRemoved(df1)
+checkPushedInfo(df1,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df1, Seq(Row(1, 19000.00)))
+
+val df2 = sql(
+  """
+|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY my_dept
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df2)
+checkLimitRemoved(df2)
+checkPushedInfo(df2,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df2, Seq(Row(1, 19000.00)))
+
+val df3 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key").sum("SALARY")
+  .orderBy("key")
+  .limit(1)
+checkSortRemoved(df3)
+checkLimitRemoved(df3)
+checkPushedInfo(df3,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df3, Seq(Row(0, 44000.00)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT", "IS_MANAGER").sum("SALARY")
+  .orderBy("DEPT", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df4)
+checkLimitRemoved(df4)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df4, Seq(Row(1, false, 9000.00)))
+
+val df5 = sql(
+  """
+|SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM 
h2.test.employee
+|GROUP BY dept, my_manager
+|ORDER BY my_dept, my_manager
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df5)
+checkLimitRemoved(df5)
+checkPushedInfo(df5,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT, IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] 
LIMIT 1")
+checkAnswer(df5, Seq(Row(1, false, 9000.00)))
+
+val df6 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"IS_MANAGER",
+when(($"SALARY" > 8000).and($"SALARY" < 1), 
$"salary").otherwise(0).as("key"))
+  .groupBy("key", "IS_MANAGER").sum("SALARY")
+  .orderBy("key", "IS_MANAGER")
+  .limit(1)
+checkSortRemoved(df6)
+checkLimitRemoved(df6)
+checkPushedInfo(df6,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: " +
+"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY 
ELSE 0.00 END, " +
+"IS_MANAGER]",
+  "PushedFilters: []",
+  "PushedTopN: ORDER BY [" +
+"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 
0.00 END " +
+"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1")
+checkAnswer(df6, Seq(Row(0.00, false, 12000.00)))
+
+val df7 = sql(
+  """
+|SELECT dept, SUM(SALARY) FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY SUM(SALARY)
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df7, false)
+checkLimitRemoved(df7, false)
+checkPushedInfo(df7,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df7, Seq(Row(6, 12000.00)))
+
+val df8 = sql(
+  """
+|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee
+|GROUP BY dept
+|ORDER BY total
+|LIMIT 1
+|""".stripMargin)
+checkSortRemoved(df8, false)
+checkLimitRemoved(df8, false)
+checkPushedInfo(df8,
+  "PushedAggregates: [SUM(SALARY)]",
+  "PushedGroupByExpressions: [DEPT]",
+  "PushedFilters: []")
+checkAnswer(df8, Seq(Row(6, 12000.00)))
+  }
+
+  

[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-07-28 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r932183215


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -410,12 +413,24 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 case s @ Sort(order, _, operation @ ScanOperation(project, filter, 
sHolder: ScanBuilderHolder))
 // Without building the Scan, we do not know the resulting column 
names after aggregate
 // push-down, and thus can't push down Top-N which needs to know the 
ordering column names.
-// TODO: we can support simple cases like GROUP BY columns directly 
and ORDER BY the same
-//   columns, which we know the resulting column names: the 
original table columns.
-if sHolder.pushedAggregate.isEmpty && filter.isEmpty &&
+// In particular, we push down the simple cases like GROUP BY 
expressions directly and
+// ORDER BY the same expressions, which we know the original table 
columns.
+if filter.isEmpty &&
   CollapseProject.canCollapseExpressions(order, project, alwaysInline 
= true) =>
   val aliasMap = getAliasMap(project)
-  val newOrder = order.map(replaceAlias(_, 
aliasMap)).asInstanceOf[Seq[SortOrder]]
+  def findGroupExprForSortOrder(sortOrder: SortOrder): SortOrder = 
sortOrder match {

Review Comment:
   I think we can just do something similar with `replaceAlias`
   ```
   sortOrder transform {
 case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-07-28 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r932182709


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -545,6 +560,9 @@ case class ScanBuilderHolder(
   var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate]
 
   var pushedAggregate: Option[Aggregation] = None
+
+  var pushedAggregateExpectedOutputMap: Map[AttributeReference, Expression] =

Review Comment:
   ```suggestion
 var pushedAggOutputMap: Map[AttributeReference, Expression] =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)

2022-07-28 Thread GitBox


cloud-fan commented on code in PR #37320:
URL: https://github.com/apache/spark/pull/37320#discussion_r932181859


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##
@@ -545,6 +560,9 @@ case class ScanBuilderHolder(
   var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate]
 
   var pushedAggregate: Option[Aggregation] = None
+
+  var pushedAggregateExpectedOutputMap: Map[AttributeReference, Expression] =

Review Comment:
   We can use `AttributeMap`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org