Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/11256#discussion_r53561584
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -300,97 +300,71 @@ object SetOperationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case a @ Aggregate(_, _, e @ Expand(projects, output, child))
- if (e.outputSet -- a.references).nonEmpty =>
- val newOutput = output.filter(a.references.contains(_))
- val newProjects = projects.map { proj =>
- proj.zip(output).filter { case (e, a) =>
+ // Prunes the unused columns from project list of
Project/Aggregate/Window/Expand
+ case p @ Project(_, p2: Project) if (p2.outputSet --
p.references).nonEmpty =>
+ p.copy(child = p2.copy(projectList =
p2.projectList.filter(p.references.contains)))
+ case p @ Project(_, a: Aggregate) if (a.outputSet --
p.references).nonEmpty =>
+ p.copy(
+ child = a.copy(aggregateExpressions =
a.aggregateExpressions.filter(p.references.contains)))
+ case p @ Project(_, w: Window) if (w.outputSet --
p.references).nonEmpty =>
+ p.copy(child = w.copy(
+ projectList = w.projectList.filter(p.references.contains),
+ windowExpressions =
w.windowExpressions.filter(p.references.contains)))
+ case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet --
a.references).nonEmpty =>
+ val newOutput = e.output.filter(a.references.contains(_))
+ val newProjects = e.projections.map { proj =>
+ proj.zip(e.output).filter { case (e, a) =>
newOutput.contains(a)
}.unzip._1
}
- a.copy(child = Expand(newProjects, newOutput, child))
+ a.copy(child = Expand(newProjects, newOutput, grandChild))
+ // TODO: support some logical plan for Dataset
- case a @ Aggregate(_, _, e @ Expand(_, _, child))
- if (child.outputSet -- e.references -- a.references).nonEmpty =>
- a.copy(child = e.copy(child = prunedChild(child, e.references ++
a.references)))
-
- // Eliminate attributes that are not needed to calculate the specified
aggregates.
+ // Prunes the unused columns from child of
Aggregate/Window/Expand/Generate
case a @ Aggregate(_, _, child) if (child.outputSet --
a.references).nonEmpty =>
- a.copy(child = Project(a.references.toSeq, child))
-
- // Eliminate attributes that are not needed to calculate the Generate.
+ a.copy(child = prunedChild(child, a.references))
+ case w @ Window(_, _, _, _, child) if (child.outputSet --
w.references).nonEmpty =>
+ w.copy(child = prunedChild(child, w.references))
+ case e @ Expand(_, _, child) if (child.outputSet --
e.references).nonEmpty =>
+ e.copy(child = prunedChild(child, e.references))
case g: Generate if !g.join && (g.child.outputSet --
g.references).nonEmpty =>
- g.copy(child = Project(g.references.toSeq, g.child))
+ g.copy(child = prunedChild(g.child, g.references))
+ // Turn off `join` for Generate if no column from it's child is used
case p @ Project(_, g: Generate) if g.join &&
p.references.subsetOf(g.generatedSet) =>
p.copy(child = g.copy(join = false))
- case p @ Project(projectList, g: Generate) if g.join =>
- val neededChildOutput = p.references -- g.generatorOutput ++
g.references
- if (neededChildOutput == g.child.outputSet) {
- p
- } else {
- Project(projectList, g.copy(child =
Project(neededChildOutput.toSeq, g.child)))
- }
-
- case p @ Project(projectList, a @ Aggregate(groupingExpressions,
aggregateExpressions, child))
- if (a.outputSet -- p.references).nonEmpty =>
- Project(
- projectList,
- Aggregate(
- groupingExpressions,
- aggregateExpressions.filter(e => p.references.contains(e)),
- child))
-
- // Eliminate unneeded attributes from either side of a Join.
- case Project(projectList, Join(left, right, joinType, condition)) =>
- // Collect the list of all references required either above or to
evaluate the condition.
- val allReferences: AttributeSet =
- AttributeSet(
- projectList.flatMap(_.references.iterator)) ++
- condition.map(_.references).getOrElse(AttributeSet(Seq.empty))
-
- /** Applies a projection only when the child is producing
unnecessary attributes */
- def pruneJoinChild(c: LogicalPlan): LogicalPlan = prunedChild(c,
allReferences)
-
- Project(projectList, Join(pruneJoinChild(left),
pruneJoinChild(right), joinType, condition))
-
// Eliminate unneeded attributes from right side of a LeftSemiJoin.
- case Join(left, right, LeftSemi, condition) =>
- // Collect the list of all references required to evaluate the
condition.
- val allReferences: AttributeSet =
- condition.map(_.references).getOrElse(AttributeSet(Seq.empty))
-
- Join(left, prunedChild(right, allReferences), LeftSemi, condition)
-
- // Push down project through limit, so that we may have chance to push
it further.
- case Project(projectList, Limit(exp, child)) =>
- Limit(exp, Project(projectList, child))
+ case j @ Join(left, right, LeftSemi, condition) =>
+ j.copy(right = prunedChild(right, j.references))
- // Push down project if possible when the child is sort.
- case p @ Project(projectList, s @ Sort(_, _, grandChild)) =>
- if (s.references.subsetOf(p.outputSet)) {
- s.copy(child = Project(projectList, grandChild))
+ // Eliminate no-op Projects
+ case p @ Project(projectList, child) if child.outputSet == p.outputSet
=> child
+
+ // all the columns will be used to compare, so we can't prune them
+ case p @ Project(_, _: SetOperation) => p
+ case p @ Project(_, _: Distinct) => p
+
+ // Can't prune the columns on LeafNode
+ case p @ Project(_, l: LeafNode) => p
+
+ // for all other logical plans that inherits the output from it's
children
+ case p @ Project(_, child) =>
+ val allAttributes = child.children.flatMap(_.outputSet).toSet
+ val required = child.references ++ p.references
+ if ((allAttributes -- required).nonEmpty) {
+ val newChildren = child.children.map(c => prunedChild(c, required))
+ p.copy(child = child.withNewChildren(newChildren))
} else {
- val neededReferences = s.references ++ p.references
- if (neededReferences == grandChild.outputSet) {
- // No column we can prune, return the original plan.
- p
- } else {
- // Do not use neededReferences.toSeq directly, should respect
grandChild's output order.
- val newProjectList =
grandChild.output.filter(neededReferences.contains)
- p.copy(child = s.copy(child = Project(newProjectList,
grandChild)))
- }
+ p
}
-
- // Eliminate no-op Projects
- case Project(projectList, child) if child.output == projectList =>
child
}
/** Applies a projection only when the child is producing unnecessary
attributes */
private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
if ((c.outputSet --
allReferences.filter(c.outputSet.contains)).nonEmpty) {
- Project(allReferences.filter(c.outputSet.contains).toSeq, c)
+ val proj =
allReferences.filter(c.outputSet.contains).toSeq.sortBy(_.name)
--- End diff --
why do we need to sort it?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]