Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11121#discussion_r52243601
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -130,6 +131,48 @@ object EliminateSerialization extends 
Rule[LogicalPlan] {
     }
     
     /**
    + * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed 
inputs of outer joins.
    + */
    +object LimitPushDown extends Rule[LogicalPlan] {
    +
    +  private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = {
    +    plan match {
    +      case GlobalLimit(expr, child) => child
    +      case _ => plan
    +    }
    +  }
    +
    +  private def maybePushLimit(limitExp: Expression, plan: LogicalPlan): 
LogicalPlan = {
    +    (limitExp, plan.maxRows) match {
    +      case (IntegerLiteral(maxRow), Some(IntegerLiteral(childMaxRows))) if 
maxRow < childMaxRows =>
    +        LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
    +      case (_, None) =>
    +        LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
    +      case _ => plan
    +    }
    +  }
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    // Adding extra Limits below UNION ALL for children which are not 
Limit or do not have Limit
    +    // descendants whose maxRow is larger. This heuristic is valid 
assuming there does not exist any
    +    // Limit push-down rule that is unable to infer the value of maxRows.
    +    // Note: right now Union means UNION ALL, which does not de-duplicate 
rows, so it is safe to
    +    // pushdown Limit through it. Once we add UNION DISTINCT, however, we 
will not be able to
    +    // pushdown Limit.
    +    case LocalLimit(exp, Union(children)) =>
    +      LocalLimit(exp, Union(children.map(maybePushLimit(exp, _))))
    +    case LocalLimit(exp, join @ Join(left, right, joinType, condition)) =>
    +      joinType match {
    +        case RightOuter => join.copy(right = maybePushLimit(exp, right))
    +        case LeftOuter => join.copy(left = maybePushLimit(exp, left))
    +        case FullOuter =>
    +          join.copy(left = maybePushLimit(exp, left), right = 
maybePushLimit(exp, right))
    --- End diff --
    
    Great! Please also update the description of this PR. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to