zhengruifeng commented on a change in pull request #32350:
URL: https://github.com/apache/spark/pull/32350#discussion_r627042329
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] {
private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean
= {
limitExpr.foldable && child.maxRows.exists { _ <=
limitExpr.eval().asInstanceOf[Int] }
}
+ private def canEliminateLocalLimit(localLimitExpr: Expression, child:
LogicalPlan): Boolean = {
+ localLimitExpr.foldable &&
+ child.maxRowsPerPartition.exists { _ <=
localLimitExpr.eval().asInstanceOf[Int] }
+ }
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case Limit(l, child) if canEliminate(l, child) =>
child
case GlobalLimit(l, child) if canEliminate(l, child) =>
child
+ case LocalLimit(l, child) if !plan.isStreaming &&
canEliminateLocalLimit(l, child) =>
Review comment:
> It is not possible that a user's query reaches this optimization path
now?
end user's query should not reaches this path, I think. This path is only
for adding a _similar_ test in `CombiningLimitsSuite`
> In a streaming case, maxRowsPerPartition can be filled? (we need the
condition !plan.isStreaming here?)
`org.apache.spark.sql.streaming.StreamSuite.SPARK-30657: streaming limit
optimization from StreamingLocalLimitExec to LocalLimitExec` fails if do not
add this condition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]