cloud-fan commented on a change in pull request #25258: [SPARK-19712][SQL] Move 
subquery rewrite to beginning of optimizer
URL: https://github.com/apache/spark/pull/25258#discussion_r307640441
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ##########
 @@ -543,12 +543,47 @@ object PushProjectionThroughUnion extends 
Rule[LogicalPlan] with PredicateHelper
  * remove the Project p2 in the following pattern:
  *
  *   p1 @ Project(_, Filter(_, p2 @ Project(_, child))) if 
p2.outputSet.subsetOf(p2.inputSet)
+ *   p1 @ Project(_, j @ Join(p2 @ Project(_, child), _, LeftSemiOrAnti(_), _))
  *
  * p2 is usually inserted by this rule and useless, p1 could prune the columns 
anyway.
  */
 object ColumnPruning extends Rule[LogicalPlan] {
 
-  def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan 
transform {
+  def apply(plan: LogicalPlan): LogicalPlan = 
removeProjectBeforeFilter(FinalColumnPruning(plan))
+
+  /**
+   * The Project before Filter or LeftSemi/LeftAnti not necessary but conflict 
with
+   * PushPredicatesThroughProject, so remove it. Since the Projects have been 
added
+   * top-down, we need to remove in bottom-up order, otherwise lower Projects 
can be missed.
+   *
+   * While removing the projects below a self join, we should ensure that the 
plan remains
+   * valid after removing the project. The project node could have been added 
to de-duplicate
+   * the attributes and thus we need to check for this case before removing 
the project node.
+   */
+  private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan 
transformUp {
+    case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child)))
+      if p2.outputSet.subsetOf(child.outputSet) =>
+      p1.copy(child = f.copy(child = child))
+
+    case p1 @ Project(_, j @ Join(p2 @ Project(_, child), right, 
LeftSemiOrAnti(_), _, _))
+      if p2.outputSet.subsetOf(child.outputSet) &&
+        child.outputSet.intersect(right.outputSet).isEmpty =>
+      p1.copy(child = j.copy(left = child))
+  }
+}
+
+/**
+ * Attempts to eliminate the reading of unneeded columns from the query plan.
+ *
+ * Since adding Project before Filter conflicts with 
PushPredicatesThroughProject, this rule will
+ * remove the Project p2 in the following pattern:
+ *
+ *   p1 @ Project(_, Filter(_, p2 @ Project(_, child))) if 
p2.outputSet.subsetOf(p2.inputSet)
+ *
+ * p2 is usually inserted by this rule and useless, p1 could prune the columns 
anyway.
+ */
+object FinalColumnPruning extends Rule[LogicalPlan] {
 
 Review comment:
   why do we need to separate the column pruning rule?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to