cloud-fan commented on a change in pull request #31691:
URL: https://github.com/apache/spark/pull/31691#discussion_r586161122
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,18 @@ object LimitPushDown extends Rule[LogicalPlan] {
case _ => join
}
LocalLimit(exp, newJoin)
+
+ // Adding an extra Limit below WINDOW when there is only one
RankLike/RowNumber
+ // window function and partitionSpec is empty.
+ case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+ window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+ WindowSpecDefinition(Nil, orderSpec,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
CurrentRow))), _)), _, _, child))
+ if child.maxRows.forall(_ > limitVal) =>
+ // Sort is needed here because we need global sort.
Review comment:
Does this mean we will do a local sort for the input partitions? What if
the input partitions are skewed and there is one large partition taking a lot
of time to do the local sort? It's possible that the partition spec is not
skewed and it's better to do a local sort on the shuffled window partitions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]