wangyum commented on a change in pull request #31691:
URL: https://github.com/apache/spark/pull/31691#discussion_r585209573



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
         case _ => join
       }
       LocalLimit(exp, newJoin)
+
+    case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+        window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+            WindowSpecDefinition(Nil, orderSpec,
+                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow))), _)), _, _, child))
+        if child.maxRows.forall( _ > limitVal) =>
+      LocalLimit(
+        limitExpr = limitExpr,
+        child = window.copy(child = Limit(limitExpr, Sort(orderSpec, true, 
child))))

Review comment:
       The current logic is sort first and then limit:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, b#11L 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST]
      +- TakeOrderedAndProject(limit=5, orderBy=[a#10L ASC NULLS FIRST,b#11L 
ASC NULLS FIRST], output=[a#10L,b#11L])
         +- FileScan parquet default.t1[a#10L,b#11L] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
   ```
   If remove sort. The logic is limit first and then sort:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, b#11L 
ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST]
      +- Sort [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST], false, 0
         +- GlobalLimit 5
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#30]
               +- LocalLimit 5
                  +- FileScan parquet default.t1[a#10L,b#11L] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark....,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to