Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11256#discussion_r53561584
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -300,97 +300,71 @@ object SetOperationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
      */
     object ColumnPruning extends Rule[LogicalPlan] {
       def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    -    case a @ Aggregate(_, _, e @ Expand(projects, output, child))
    -      if (e.outputSet -- a.references).nonEmpty =>
    -      val newOutput = output.filter(a.references.contains(_))
    -      val newProjects = projects.map { proj =>
    -        proj.zip(output).filter { case (e, a) =>
    +    // Prunes the unused columns from project list of 
Project/Aggregate/Window/Expand
    +    case p @ Project(_, p2: Project) if (p2.outputSet -- 
p.references).nonEmpty =>
    +      p.copy(child = p2.copy(projectList = 
p2.projectList.filter(p.references.contains)))
    +    case p @ Project(_, a: Aggregate) if (a.outputSet -- 
p.references).nonEmpty =>
    +      p.copy(
    +        child = a.copy(aggregateExpressions = 
a.aggregateExpressions.filter(p.references.contains)))
    +    case p @ Project(_, w: Window) if (w.outputSet -- 
p.references).nonEmpty =>
    +      p.copy(child = w.copy(
    +        projectList = w.projectList.filter(p.references.contains),
    +        windowExpressions = 
w.windowExpressions.filter(p.references.contains)))
    +    case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- 
a.references).nonEmpty =>
    +      val newOutput = e.output.filter(a.references.contains(_))
    +      val newProjects = e.projections.map { proj =>
    +        proj.zip(e.output).filter { case (e, a) =>
               newOutput.contains(a)
             }.unzip._1
           }
    -      a.copy(child = Expand(newProjects, newOutput, child))
    +      a.copy(child = Expand(newProjects, newOutput, grandChild))
    +    // TODO: support some logical plan for Dataset
     
    -    case a @ Aggregate(_, _, e @ Expand(_, _, child))
    -      if (child.outputSet -- e.references -- a.references).nonEmpty =>
    -      a.copy(child = e.copy(child = prunedChild(child, e.references ++ 
a.references)))
    -
    -    // Eliminate attributes that are not needed to calculate the specified 
aggregates.
    +    // Prunes the unused columns from child of 
Aggregate/Window/Expand/Generate
         case a @ Aggregate(_, _, child) if (child.outputSet -- 
a.references).nonEmpty =>
    -      a.copy(child = Project(a.references.toSeq, child))
    -
    -    // Eliminate attributes that are not needed to calculate the Generate.
    +      a.copy(child = prunedChild(child, a.references))
    +    case w @ Window(_, _, _, _, child) if (child.outputSet -- 
w.references).nonEmpty =>
    +      w.copy(child = prunedChild(child, w.references))
    +    case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
    +      e.copy(child = prunedChild(child, e.references))
         case g: Generate if !g.join && (g.child.outputSet -- 
g.references).nonEmpty =>
    -      g.copy(child = Project(g.references.toSeq, g.child))
    +      g.copy(child = prunedChild(g.child, g.references))
     
    +    // Turn off `join` for Generate if no column from it's child is used
         case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
           p.copy(child = g.copy(join = false))
     
    -    case p @ Project(projectList, g: Generate) if g.join =>
    -      val neededChildOutput = p.references -- g.generatorOutput ++ 
g.references
    -      if (neededChildOutput == g.child.outputSet) {
    -        p
    -      } else {
    -        Project(projectList, g.copy(child = 
Project(neededChildOutput.toSeq, g.child)))
    -      }
    -
    -    case p @ Project(projectList, a @ Aggregate(groupingExpressions, 
aggregateExpressions, child))
    -        if (a.outputSet -- p.references).nonEmpty =>
    -      Project(
    -        projectList,
    -        Aggregate(
    -          groupingExpressions,
    -          aggregateExpressions.filter(e => p.references.contains(e)),
    -          child))
    -
    -    // Eliminate unneeded attributes from either side of a Join.
    -    case Project(projectList, Join(left, right, joinType, condition)) =>
    -      // Collect the list of all references required either above or to 
evaluate the condition.
    -      val allReferences: AttributeSet =
    -        AttributeSet(
    -          projectList.flatMap(_.references.iterator)) ++
    -          condition.map(_.references).getOrElse(AttributeSet(Seq.empty))
    -
    -      /** Applies a projection only when the child is producing 
unnecessary attributes */
    -      def pruneJoinChild(c: LogicalPlan): LogicalPlan = prunedChild(c, 
allReferences)
    -
    -      Project(projectList, Join(pruneJoinChild(left), 
pruneJoinChild(right), joinType, condition))
    -
         // Eliminate unneeded attributes from right side of a LeftSemiJoin.
    -    case Join(left, right, LeftSemi, condition) =>
    -      // Collect the list of all references required to evaluate the 
condition.
    -      val allReferences: AttributeSet =
    -        condition.map(_.references).getOrElse(AttributeSet(Seq.empty))
    -
    -      Join(left, prunedChild(right, allReferences), LeftSemi, condition)
    -
    -    // Push down project through limit, so that we may have chance to push 
it further.
    -    case Project(projectList, Limit(exp, child)) =>
    -      Limit(exp, Project(projectList, child))
    +    case j @ Join(left, right, LeftSemi, condition) =>
    +      j.copy(right = prunedChild(right, j.references))
     
    -    // Push down project if possible when the child is sort.
    -    case p @ Project(projectList, s @ Sort(_, _, grandChild)) =>
    -      if (s.references.subsetOf(p.outputSet)) {
    -        s.copy(child = Project(projectList, grandChild))
    +    // Eliminate no-op Projects
    +    case p @ Project(projectList, child) if child.outputSet == p.outputSet 
=> child
    +
    +    // all the columns will be used to compare, so we can't prune them
    +    case p @ Project(_, _: SetOperation) => p
    +    case p @ Project(_, _: Distinct) => p
    +
    +    // Can't prune the columns on LeafNode
    +    case p @ Project(_, l: LeafNode) => p
    +
    +    // for all other logical plans that inherits the output from it's 
children
    +    case p @ Project(_, child) =>
    +      val allAttributes = child.children.flatMap(_.outputSet).toSet
    +      val required = child.references ++ p.references
    +      if ((allAttributes -- required).nonEmpty) {
    +        val newChildren = child.children.map(c => prunedChild(c, required))
    +        p.copy(child = child.withNewChildren(newChildren))
           } else {
    -        val neededReferences = s.references ++ p.references
    -        if (neededReferences == grandChild.outputSet) {
    -          // No column we can prune, return the original plan.
    -          p
    -        } else {
    -          // Do not use neededReferences.toSeq directly, should respect 
grandChild's output order.
    -          val newProjectList = 
grandChild.output.filter(neededReferences.contains)
    -          p.copy(child = s.copy(child = Project(newProjectList, 
grandChild)))
    -        }
    +        p
           }
    -
    -    // Eliminate no-op Projects
    -    case Project(projectList, child) if child.output == projectList => 
child
       }
     
       /** Applies a projection only when the child is producing unnecessary 
attributes */
       private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
         if ((c.outputSet -- 
allReferences.filter(c.outputSet.contains)).nonEmpty) {
    -      Project(allReferences.filter(c.outputSet.contains).toSeq, c)
    +      val proj = 
allReferences.filter(c.outputSet.contains).toSeq.sortBy(_.name)
    --- End diff --
    
    why do we need to sort it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to