[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938796458 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") + .groupBy("my_dept", "my_manager").sum("SALARY") + .orderBy("my_dept", "my_manager") + .limit(1) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) + .limit(1) +checkSortRemoved(df7, false) +checkLimitRemoved(df7, false) +checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df7, Seq(Row(6, 12000.00))) + +val df8 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY").as("total")) + .orderBy("total") + .limit(1) +checkSortRemoved(df8, false) +checkLimitRemoved(df8, false) +checkPushedInfo(df8, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df8, Seq(Row(6, 12000.00))) + } + +
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938794884 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") + .groupBy("my_dept", "my_manager").sum("SALARY") + .orderBy("my_dept", "my_manager") + .limit(1) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) Review Comment: We can translate agg expressions now, why this test still can't trigger top-n pushdown? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938794073 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") + .groupBy("my_dept", "my_manager").sum("SALARY") + .orderBy("my_dept", "my_manager") + .limit(1) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) Review Comment: how about ``` .groupBy("dept").agg(sum("SALARY").as("sum_salary")) .orderBy("sum_salary") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938792943 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("dept").sum("SALARY") + .orderBy($"dept".gt(1)) + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT > 1 ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, 19000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) Review Comment: let's use case instead of case when, which is simpler and easier to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938792433 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("dept").sum("SALARY") + .orderBy($"dept".gt(1)) Review Comment: let's use cast now. group by a predicate is super weird. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938791655 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") Review Comment: let's add a cast instead of just a simple alias ``` .select($"DEPT".cast("string").as("my_dept"), $"SALARY") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938792037 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,253 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) Review Comment: we can remove this test after you address https://github.com/apache/spark/pull/37320/files#r938791655 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938790245 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -408,16 +411,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } (operation, isPushed && !isPartiallyPushed) case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) -// Without building the Scan, we do not know the resulting column names after aggregate -// push-down, and thus can't push down Top-N which needs to know the ordering column names. -// TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same -// columns, which we know the resulting column names: the original table columns. -if sHolder.pushedAggregate.isEmpty && filter.isEmpty && - CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => + if filter.isEmpty && +CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)) + val newOrder = if (sHolder.pushedAggregate.isDefined) { +// Without building the Scan, Aggregate push-down give the expected output starts with +// `group_col_` or `agg_func_`. When Aggregate push-down working with Sort for top n +// push-down, we need replace these expected output with the origin expressions. Review Comment: ``` // `ScanBuilderHolder` has different output columns after aggregate pushdown. Here we // replace the attributes in ordering expressions with the original table output columns. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r938790445 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -408,16 +411,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } (operation, isPushed && !isPartiallyPushed) case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) -// Without building the Scan, we do not know the resulting column names after aggregate -// push-down, and thus can't push down Top-N which needs to know the ordering column names. -// TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same -// columns, which we know the resulting column names: the original table columns. -if sHolder.pushedAggregate.isEmpty && filter.isEmpty && - CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => + if filter.isEmpty && +CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)) + val newOrder = if (sHolder.pushedAggregate.isDefined) { +// Without building the Scan, Aggregate push-down give the expected output starts with +// `group_col_` or `agg_func_`. When Aggregate push-down working with Sort for top n +// push-down, we need replace these expected output with the origin expressions. +aliasReplacedOrder.map { + _.transform { +case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) + }.asInstanceOf[SortOrder] +} + } else { +aliasReplacedOrder.asInstanceOf[Seq[SortOrder]] + } val normalizedOrders = DataSourceStrategy.normalizeExprs( newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]] + // Because V2ExpressionBuilder can't translate aggregate functions, so we can't Review Comment: we can remove this TODO now. ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -408,16 +411,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } (operation, isPushed && !isPartiallyPushed) case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) -// Without building the Scan, we do not know the resulting column names after aggregate -// push-down, and thus can't push down Top-N which needs to know the ordering column names. -// TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same -// columns, which we know the resulting column names: the original table columns. -if sHolder.pushedAggregate.isEmpty && filter.isEmpty && - CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => + if filter.isEmpty && +CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)) + val newOrder = if (sHolder.pushedAggregate.isDefined) { +// Without building the Scan, Aggregate push-down give the expected output starts with +// `group_col_` or `agg_func_`. When Aggregate push-down working with Sort for top n +// push-down, we need replace these expected output with the origin expressions. +aliasReplacedOrder.map { + _.transform { +case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) + }.asInstanceOf[SortOrder] +} + } else { +aliasReplacedOrder.asInstanceOf[Seq[SortOrder]] + } val normalizedOrders = DataSourceStrategy.normalizeExprs( newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]] + // Because V2ExpressionBuilder can't translate aggregate functions, so we can't Review Comment: and let's add tests for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935119522 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,255 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + val df4 = spark.read +.table("h2.test.employee") +.groupBy("dept").sum("SALARY") +.orderBy($"dept" + 100) Review Comment: BTW, can we make the grouping key an expression? like this https://github.com/apache/spark/pull/37320/files#diff-1496378d9e7817c45c962f1af48e5e765cb475bd01d58edec118d98225e02ef3R887 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935119295 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,255 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + val df4 = spark.read +.table("h2.test.employee") +.groupBy("dept").sum("SALARY") +.orderBy($"dept" + 100) Review Comment: can we use some functions that don't need ansi mode to simplify the test? e.g. `log(10, dept)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935064814 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") Review Comment: can we also test `.orderBy($"key" + 100)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935064627 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") + .groupBy("my_dept", "my_manager").sum("SALARY") + .orderBy("my_dept", "my_manager") + .limit(1) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) + .limit(1) +checkSortRemoved(df7, false) +checkLimitRemoved(df7, false) +checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df7, Seq(Row(6, 12000.00))) + +val df8 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY").as("total")) + .orderBy("total") + .limit(1) +checkSortRemoved(df8, false) +checkLimitRemoved(df8, false) +checkPushedInfo(df8, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df8, Seq(Row(6, 12000.00))) + } + +
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935064235 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") + .groupBy("my_dept", "my_manager").sum("SALARY") + .orderBy("my_dept", "my_manager") + .limit(1) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) + .limit(1) +checkSortRemoved(df7, false) +checkLimitRemoved(df7, false) +checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df7, Seq(Row(6, 12000.00))) + +val df8 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY").as("total")) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935063965 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") + .groupBy("my_dept", "my_manager").sum("SALARY") + .orderBy("my_dept", "my_manager") + .limit(1) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"SALARY") + .groupBy("dept").agg(sum("SALARY")) + .orderBy(sum("SALARY")) Review Comment: can we add some comments to explain why top n can't be pushed here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935063573 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -864,6 +851,254 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"SALARY") + .groupBy("my_dept").sum("SALARY") + .orderBy("my_dept") + .limit(1) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = spark.read + .table("h2.test.employee") + .select($"DEPT".as("my_dept"), $"IS_MANAGER".as("my_manager"), $"SALARY") Review Comment: We already have tests to verify the alias, we don't need to test it again with 2 columns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935061143 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -410,12 +413,21 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) // Without building the Scan, we do not know the resulting column names after aggregate // push-down, and thus can't push down Top-N which needs to know the ordering column names. -// TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same -// columns, which we know the resulting column names: the original table columns. -if sHolder.pushedAggregate.isEmpty && filter.isEmpty && +// In particular, we push down the simple cases like GROUP BY expressions directly and +// ORDER BY the same expressions, which we know the original table columns. +if filter.isEmpty && CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + val aliasReplacedOrder = order.map(replaceAlias(_, aliasMap)) + val newOrder = if (sHolder.pushedAggregate.isDefined) { +aliasReplacedOrder.map { Review Comment: let's add some comments here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r935060915 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -410,12 +413,21 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) // Without building the Scan, we do not know the resulting column names after aggregate // push-down, and thus can't push down Top-N which needs to know the ordering column names. -// TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same -// columns, which we know the resulting column names: the original table columns. -if sHolder.pushedAggregate.isEmpty && filter.isEmpty && +// In particular, we push down the simple cases like GROUP BY expressions directly and +// ORDER BY the same expressions, which we know the original table columns. Review Comment: I think we can remove all these comments here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r933157882 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -811,6 +800,244 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = sql( + """ +|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept +|ORDER BY my_dept +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = sql( + """ +|SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept, my_manager +|ORDER BY my_dept, my_manager +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = sql( + """ +|SELECT dept, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept +|ORDER BY SUM(SALARY) +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df7, false) +checkLimitRemoved(df7, false) +checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df7, Seq(Row(6, 12000.00))) + +val df8 = sql( + """ +|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee +|GROUP BY dept +|ORDER BY total +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df8, false) +checkLimitRemoved(df8, false) +checkPushedInfo(df8, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df8, Seq(Row(6, 12000.00))) + } + +
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r933157287 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -811,6 +800,244 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = sql( + """ +|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept +|ORDER BY my_dept +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = sql( + """ +|SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept, my_manager +|ORDER BY my_dept, my_manager +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = sql( + """ +|SELECT dept, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept +|ORDER BY SUM(SALARY) +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df7, false) +checkLimitRemoved(df7, false) +checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df7, Seq(Row(6, 12000.00))) + +val df8 = sql( + """ +|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee +|GROUP BY dept +|ORDER BY total +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df8, false) +checkLimitRemoved(df8, false) +checkPushedInfo(df8, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df8, Seq(Row(6, 12000.00))) + } + +
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r932186273 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -811,6 +800,244 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 1.00))) } + test("scan with aggregate push-down and top N push-down") { +val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) +checkSortRemoved(df1) +checkLimitRemoved(df1) +checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df1, Seq(Row(1, 19000.00))) + +val df2 = sql( + """ +|SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept +|ORDER BY my_dept +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df2) +checkLimitRemoved(df2) +checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") +checkAnswer(df2, Seq(Row(1, 19000.00))) + +val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) +checkSortRemoved(df3) +checkLimitRemoved(df3) +checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST] LIMIT 1") +checkAnswer(df3, Seq(Row(0, 44000.00))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT", "IS_MANAGER").sum("SALARY") + .orderBy("DEPT", "IS_MANAGER") + .limit(1) +checkSortRemoved(df4) +checkLimitRemoved(df4) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df4, Seq(Row(1, false, 9000.00))) + +val df5 = sql( + """ +|SELECT dept AS my_dept, is_manager AS my_manager, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept, my_manager +|ORDER BY my_dept, my_manager +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df5) +checkLimitRemoved(df5) +checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT, IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df5, Seq(Row(1, false, 9000.00))) + +val df6 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"IS_MANAGER", +when(($"SALARY" > 8000).and($"SALARY" < 1), $"salary").otherwise(0).as("key")) + .groupBy("key", "IS_MANAGER").sum("SALARY") + .orderBy("key", "IS_MANAGER") + .limit(1) +checkSortRemoved(df6) +checkLimitRemoved(df6) +checkPushedInfo(df6, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + +"[CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END, " + +"IS_MANAGER]", + "PushedFilters: []", + "PushedTopN: ORDER BY [" + +"CASE WHEN (SALARY > 8000.00) AND (SALARY < 1.00) THEN SALARY ELSE 0.00 END " + +"ASC NULLS FIRST, IS_MANAGER ASC NULLS FIRST] LIMIT 1") +checkAnswer(df6, Seq(Row(0.00, false, 12000.00))) + +val df7 = sql( + """ +|SELECT dept, SUM(SALARY) FROM h2.test.employee +|GROUP BY dept +|ORDER BY SUM(SALARY) +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df7, false) +checkLimitRemoved(df7, false) +checkPushedInfo(df7, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df7, Seq(Row(6, 12000.00))) + +val df8 = sql( + """ +|SELECT dept, SUM(SALARY) AS total FROM h2.test.employee +|GROUP BY dept +|ORDER BY total +|LIMIT 1 +|""".stripMargin) +checkSortRemoved(df8, false) +checkLimitRemoved(df8, false) +checkPushedInfo(df8, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") +checkAnswer(df8, Seq(Row(6, 12000.00))) + } + +
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r932183215 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -410,12 +413,24 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) // Without building the Scan, we do not know the resulting column names after aggregate // push-down, and thus can't push down Top-N which needs to know the ordering column names. -// TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same -// columns, which we know the resulting column names: the original table columns. -if sHolder.pushedAggregate.isEmpty && filter.isEmpty && +// In particular, we push down the simple cases like GROUP BY expressions directly and +// ORDER BY the same expressions, which we know the original table columns. +if filter.isEmpty && CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + def findGroupExprForSortOrder(sortOrder: SortOrder): SortOrder = sortOrder match { Review Comment: I think we can just do something similar with `replaceAlias` ``` sortOrder transform { case a: Attribute => sHolder.pushedAggOutputMap.getOrElse(a, a) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r932182709 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -545,6 +560,9 @@ case class ScanBuilderHolder( var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate] var pushedAggregate: Option[Aggregation] = None + + var pushedAggregateExpectedOutputMap: Map[AttributeReference, Expression] = Review Comment: ```suggestion var pushedAggOutputMap: Map[AttributeReference, Expression] = ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37320: [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group expressions)
cloud-fan commented on code in PR #37320: URL: https://github.com/apache/spark/pull/37320#discussion_r932181859 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -545,6 +560,9 @@ case class ScanBuilderHolder( var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate] var pushedAggregate: Option[Aggregation] = None + + var pushedAggregateExpectedOutputMap: Map[AttributeReference, Expression] = Review Comment: We can use `AttributeMap` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org