This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 22dae38b5e2 [SPARK-38614][SQL] Don't push down limit through window that's using percent_rank 22dae38b5e2 is described below commit 22dae38b5e27e783732a37b537c80b1202adc288 Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Thu Jun 23 08:07:43 2022 +0800 [SPARK-38614][SQL] Don't push down limit through window that's using percent_rank Change `LimitPushDownThroughWindow` so that it no longer supports pushing down a limit through a window using percent_rank. Given a query with a limit of _n_ rows, and a window whose child produces _m_ rows, percent_rank will label the _nth_ row as 100% rather than the _mth_ row. This behavior conflicts with Spark 3.1.3, Hive 2.3.9 and Prestodb 0.268. Assume this data: ``` create table t1 stored as parquet as select * from range(101); ``` And also assume this query: ``` select id, percent_rank() over (order by id) as pr from t1 limit 3; ``` With Spark 3.2.1, 3.3.0, and master, the limit is applied before the percent_rank: ``` 0 0.0 1 0.5 2 1.0 ``` With Spark 3.1.3, and Hive 2.3.9, and Prestodb 0.268, the limit is applied _after_ percent_rank: Spark 3.1.3: ``` 0 0.0 1 0.01 2 0.02 ``` Hive 2.3.9: ``` 0: jdbc:hive2://localhost:10000> select id, percent_rank() over (order by id) as pr from t1 limit 3; . . . . . . . . . . . . . . . .> . . . . . . . . . . . . . . . .> WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. +-----+-------+ | id | pr | +-----+-------+ | 0 | 0.0 | | 1 | 0.01 | | 2 | 0.02 | +-----+-------+ 3 rows selected (4.621 seconds) 0: jdbc:hive2://localhost:10000> ``` Prestodb 0.268: ``` id | pr ----+------ 0 | 0.0 1 | 0.01 2 | 0.02 (3 rows) ``` With this PR, Spark will apply the limit after percent_rank. No (besides changing percent_rank's behavior to be more like Spark 3.1.3, Hive, and Prestodb). New unit tests. Closes #36951 from bersprockets/percent_rank_issue. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> (cherry picked from commit a63ce5676e79f15903e9fd533a26a6c3ec9bf7a8) Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../sql/catalyst/optimizer/LimitPushDownThroughWindow.scala | 5 +++-- .../optimizer/LimitPushdownThroughWindowSuite.scala | 13 ++++++++++++- .../apache/spark/sql/DataFrameWindowFunctionsSuite.scala | 13 +++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala index eaea167ee9f..635434741b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, IntegerLiteral, NamedExpression, RankLike, RowFrame, RowNumberLike, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW} @@ -33,7 +33,8 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] { // The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW. private def supportsPushdownThroughWindow( windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall { - case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _, + case Alias(WindowExpression(_: Rank | _: DenseRank | _: NTile | _: RowNumber, + WindowSpecDefinition(Nil, _, SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala index f2c1f452d02..b09d10b2601 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{CurrentRow, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding} +import org.apache.spark.sql.catalyst.expressions.{CurrentRow, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -187,4 +187,15 @@ class LimitPushdownThroughWindowSuite extends PlanTest { Optimize.execute(originalQuery.analyze), WithoutOptimize.execute(originalQuery.analyze)) } + + test("SPARK-38614: Should not push through percent_rank window function") { + val originalQuery = testRelation + .select(a, b, c, + windowExpr(new PercentRank(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(2) + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(originalQuery.analyze)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 70b0150fc5b..78bb3180ff1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -1114,4 +1114,17 @@ class DataFrameWindowFunctionsSuite extends QueryTest Row("a", 1, "x", "x"), Row("b", 0, null, null))) } + + test("SPARK-38614: percent_rank should apply before limit") { + val df = Seq.tabulate(101)(identity).toDF("id") + val w = Window.orderBy("id") + checkAnswer( + df.select($"id", percent_rank().over(w)).limit(3), + Seq( + Row(0, 0.0d), + Row(1, 0.01d), + Row(2, 0.02d) + ) + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org