wangyum commented on a change in pull request #31739:
URL: https://github.com/apache/spark/pull/31739#discussion_r599153181



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -597,31 +623,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
     // pushdown Limit.
     case LocalLimit(exp, u: Union) =>
       LocalLimit(exp, u.copy(children = 
u.children.map(maybePushLocalLimit(exp, _))))
-    // Add extra limits below JOIN:
-    // 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and 
right sides,
-    //    respectively.
-    // 2. For INNER and CROSS JOIN, we push limits to both the left and right 
sides if join
-    //    condition is empty.
-    // 3. For LEFT SEMI and LEFT ANTI JOIN, we push limits to the left side if 
join condition
-    //    is empty.
-    // It's not safe to push limits below FULL OUTER JOIN in the general case 
without a more
-    // invasive rewrite. We also need to ensure that this limit pushdown rule 
will not eventually
-    // introduce limits on both sides if it is applied multiple times. 
Therefore:
-    //   - If one side is already limited, stack another limit on top if the 
new limit is smaller.
-    //     The redundant limit will be collapsed by the CombineLimits rule.
-    case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) 
=>
-      val newJoin = joinType match {
-        case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
-        case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
-        case _: InnerLike if conditionOpt.isEmpty =>
-          join.copy(
-            left = maybePushLocalLimit(exp, left),
-            right = maybePushLocalLimit(exp, right))
-        case LeftSemi | LeftAnti if conditionOpt.isEmpty =>
-          join.copy(left = maybePushLocalLimit(exp, left))
-        case _ => join
-      }
-      LocalLimit(exp, newJoin)
+
+    case LocalLimit(exp, join: Join) =>
+      LocalLimit(exp, pushLocalLimitThroughJoin(exp, join))
+    case LocalLimit(exp, project @ Project(_, join: Join)) =>

Review comment:
       It will introduce useless pushdown even only allow `Join`.
   ```scala
       case LocalLimit(exp, p: Project) if p.child.isInstanceOf[Join] =>
         LocalLimit(exp, p.copy(child = maybePushLocalLimit(exp, p.child)))
   ```
   For example:
   ```scala
   spark.range(200L).selectExpr("id AS a", "id AS b").write.saveAsTable("t1")
   spark.range(300L).selectExpr("id AS x", "id AS y").write.saveAsTable("t2")
   spark.sql("SELECT 1 FROM t1 INNER JOIN t2 ON a = x limit 10").explain(true)
   ```
   
   ```
   == Optimized Logical Plan ==
   GlobalLimit 10
   +- LocalLimit 10
      +- Project [1 AS 1#20]
         +- LocalLimit 10
            +- Project
               +- Join Inner, (a#16L = x#18L)
                  :- Project [a#16L]
                  :  +- Filter isnotnull(a#16L)
                  :     +- Relation default.t1[a#16L,b#17L] parquet
                  +- Project [x#18L]
                     +- Filter isnotnull(x#18L)
                        +- Relation default.t2[x#18L,y#19L] parquet
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- CollectLimit 10
      +- Project [1 AS 1#20]
         +- LocalLimit 10
            +- Project
               +- BroadcastHashJoin [a#16L], [x#18L], Inner, BuildLeft, false
                  :- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#68]
                  :  +- Filter isnotnull(a#16L)
                  :     +- FileScan parquet default.t1[a#16L] 
                  +- Filter isnotnull(x#18L)
                     +- FileScan parquet default.t2[x#18L]
   
   ```
   
   Another example is TPC-DS q32:
   
https://github.com/apache/spark/blob/66f5a42ca5d259038f0749ae2b9a04cc2f658880/sql/core/src/test/resources/tpcds/q32.sql#L1-L15
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to