Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20988#discussion_r181535823
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
    @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
     
       /**
        * A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
    -   * pair of the partition attributes and the table relation node.
    +   * pair of the partition attributes, partition filters, and the table 
relation node.
        *
        * It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
        * deterministic expressions, and returns result after reaching the 
partitioned table relation
        * node.
        */
    -  object PartitionedRelation {
    -
    -    def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
    -      case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
    -        if fsRelation.partitionSchema.nonEmpty =>
    -        val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
    -        Some((AttributeSet(partAttrs), l))
    -
    -      case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
    -        val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
    -        Some((AttributeSet(partAttrs), relation))
    -
    -      case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
    -        unapply(child).flatMap { case (partAttrs, relation) =>
    -          if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
    -        }
    +  object PartitionedRelation extends PredicateHelper {
    +
    +    def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
    +      plan match {
    +        case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
    +          if fsRelation.partitionSchema.nonEmpty =>
    +          val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
    +          Some((AttributeSet(partAttrs), Nil, l))
    +
    +        case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
    +          val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
    +          Some((AttributeSet(partAttrs), Nil, relation))
    +
    +        case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
    +          unapply(child).flatMap { case (partAttrs, filters, relation) =>
    +            if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
    --- End diff --
    
    I'd propose something top-down like
    ```
    def getPartitionedRelation(
        plan: LogicalPlan,
        predicates: Seq[Expression]): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
      plan match {
        case Filter(condition, child) if condition.deterministic =>
          getPartitionedRelation(child, predicates ++ 
splitConjunctivePredicates(condition))
       
        case Project(projectList, child) if projectList.forall(_.deterministic) 
=>
          getPartitionedRelation(child, 
predicates.filter(_.references.subsetOf(child.outputSet)))
        
        case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) if 
fsRelation.partitionSchema.nonEmpty =>
          val partAttrs = ...
          val partitionFilters = 
predicates.filter(_.references.subsetOf(partAttrs))
          Some(...)
    
        case _ => None
      }
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to