Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18993#discussion_r133941801
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -63,29 +63,24 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
        */
       object SpecialLimits extends Strategy {
         override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -      case logical.ReturnAnswer(rootPlan) => rootPlan match {
    -        case logical.Limit(IntegerLiteral(limit), logical.Sort(order, 
true, child)) =>
    -          execution.TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
    -        case logical.Limit(
    -            IntegerLiteral(limit),
    -            logical.Project(projectList, logical.Sort(order, true, 
child))) =>
    -          execution.TakeOrderedAndProjectExec(
    -            limit, order, projectList, planLater(child)) :: Nil
    -        case logical.Limit(IntegerLiteral(limit), child) =>
    -          // Normally wrapping child with `LocalLimitExec` here is a 
no-op, because
    -          // `CollectLimitExec.executeCollect` will call 
`LocalLimitExec.executeTake`, which
    -          // calls `child.executeTake`. If child supports whole stage 
codegen, adding this
    -          // `LocalLimitExec` can stop the processing of whole stage 
codegen and trigger the
    -          // resource releasing work, after we consume `limit` rows.
    -          execution.CollectLimitExec(limit, LocalLimitExec(limit, 
planLater(child))) :: Nil
    +      case ReturnAnswer(rootPlan) => rootPlan match {
    +        case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
    +          TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
    +        case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child))) =>
    +          TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
    +        case Limit(IntegerLiteral(limit), child) =>
    +          // With whole stage codegen, Spark releases resources only when 
all the output data of the
    +          // query plan are consumed. It's possible that 
`CollectLimitExec` only consumes a little
    +          // data from child plan and finishes the query without releasing 
resources. Here we wrap
    +          // the child plan with `LocalLimitExec`, to stop the processing 
of whole stage codegen and
    +          // trigger the resource releasing work, after we consume `limit` 
rows.
    --- End diff --
    
    comments updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to