Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/2345#issuecomment-55178764
  
    Hey @koeninger, thanks for implementing this optimization!
    
    Overall this looks pretty good.  A few minor suggestions:
     - I'm not sure that we want to check the names and qualifiers to find the 
corresponding `Attribute`s.  It is possible that either side could actually 
have tables that are aliased differently and thus the `Attribute`s would have 
different qualifiers.  Instead, I think that it safe to assume that the 
analysis phase has checked name and type matching and just find the 
corresponding `Attribute` by ordering.  I'm thinking something like this:
    
    ```scala
    /**
     * Pushes Project and Filter operations to either side of a Union.
     */
    object UnionPushdown extends Rule[LogicalPlan] {
    
      /**
       * Rewrites an expression so that it can be pushed to the right side of a 
Union operator.
       * This method relies on the fact that the output attributes of a union 
are always equal to the
       * left child's output.
       */
      def pushToRight[A <: Expression](e: A, union: Union): A = {
        assert(union.left.output.size == union.right.output.size)
    
        // Maps Attributes from the left side to the corresponding Attribute on 
the right side.
        val rewrites = AttributeMap(union.left.output.zip(union.right.output))
        val result = e transform {
          case a: Attribute => rewrites(a)
        }
    
        // We must promise the compiler that we did not discard the names in 
the case of project
        // expressions.  This is safe since the only transformation is from 
Attribute => Attribute.
        result.asInstanceOf[A]
      }
    
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        // Push down filter into union
        case Filter(condition, u @ Union(left, right)) =>
          Union(
            Filter(condition, left),
            Filter(pushToRight(condition, u), right))
    
        // Push down projection into union
        case Project(projectList, u @ Union(left, right)) =>
          Union(
            Project(projectList, left),
            Project(projectList.map(pushToRight(_, u)), right))
      }
    }
    ```
     - Would also be great to add some tests to `FilterPushdownSuite` and maybe 
create a similar `ColumnPruningSuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to