c21 commented on a change in pull request #31691:
URL: https://github.com/apache/spark/pull/31691#discussion_r585100057
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
case _ => join
}
LocalLimit(exp, newJoin)
+
+ case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
Review comment:
nit: shall we update comment in L554 and here, similar to `UNION ALL`
and `JOIN`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
case _ => join
}
LocalLimit(exp, newJoin)
+
+ case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+ window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+ WindowSpecDefinition(Nil, orderSpec,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
CurrentRow))), _)), _, _, child))
+ if child.maxRows.forall( _ > limitVal) =>
+ LocalLimit(
Review comment:
Do we still need the `LocalLimit` here? We already restrict the window
expression to be `RankLike` and `RowNumber`, so we know the number of rows will
not change before & after window, right?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
case _ => join
}
LocalLimit(exp, newJoin)
+
+ case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+ window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+ WindowSpecDefinition(Nil, orderSpec,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
CurrentRow))), _)), _, _, child))
+ if child.maxRows.forall( _ > limitVal) =>
+ LocalLimit(
+ limitExpr = limitExpr,
+ child = window.copy(child = Limit(limitExpr, Sort(orderSpec, true,
child))))
Review comment:
Wondering why do we need an extra `Sort` here? Shouldn't physical plan
rule `EnsureRequirements` add the sort between window and limit?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
case _ => join
}
LocalLimit(exp, newJoin)
+
+ case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+ window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+ WindowSpecDefinition(Nil, orderSpec,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
CurrentRow))), _)), _, _, child))
+ if child.maxRows.forall( _ > limitVal) =>
Review comment:
nit: one extra unnecessary space: `( _`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]