Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11427#discussion_r57001834
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -404,69 +404,57 @@ object ColumnPruning extends Rule[LogicalPlan] {
     object CollapseProject extends Rule[LogicalPlan] {
     
       def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    -    case p @ Project(projectList1, Project(projectList2, child)) =>
    -      // Create a map of Aliases to their values from the child projection.
    -      // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c 
-> Alias(a + b, c)).
    -      val aliasMap = AttributeMap(projectList2.collect {
    -        case a: Alias => (a.toAttribute, a)
    -      })
    -
    -      // We only collapse these two Projects if their overlapped 
expressions are all
    -      // deterministic.
    -      val hasNondeterministic = projectList1.exists(_.collect {
    -        case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
    -      }.exists(!_.deterministic))
    -
    -      if (hasNondeterministic) {
    -        p
    +    case p1 @ Project(_, p2: Project) =>
    +      if (hasNondeterministic(p1.projectList, p2.projectList)) {
    +        p1
           } else {
    -        // Substitute any attributes that are produced by the child 
projection, so that we safely
    -        // eliminate it.
    -        // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 
'SELECT a + b + 1 ...'
    -        // TODO: Fix TransformBase to avoid the cast below.
    -        val substitutedProjection = projectList1.map(_.transform {
    -          case a: Attribute => aliasMap.getOrElse(a, a)
    -        }).asInstanceOf[Seq[NamedExpression]]
    -        // collapse 2 projects may introduce unnecessary Aliases, trim 
them here.
    -        val cleanedProjection = substitutedProjection.map(p =>
    -          
CleanupAliases.trimNonTopLevelAliases(p).asInstanceOf[NamedExpression]
    -        )
    -        Project(cleanedProjection, child)
    +        p2.copy(projectList = buildCleanedProjectList(p1.projectList, 
p2.projectList))
           }
    -
    -    // TODO Eliminate duplicate code
    -    // This clause is identical to the one above except that the inner 
operator is an `Aggregate`
    -    // rather than a `Project`.
    -    case p @ Project(projectList1, agg @ Aggregate(_, projectList2, 
child)) =>
    -      // Create a map of Aliases to their values from the child projection.
    -      // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c 
-> Alias(a + b, c)).
    -      val aliasMap = AttributeMap(projectList2.collect {
    -        case a: Alias => (a.toAttribute, a)
    -      })
    -
    -      // We only collapse these two Projects if their overlapped 
expressions are all
    -      // deterministic.
    -      val hasNondeterministic = projectList1.exists(_.collect {
    -        case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
    -      }.exists(!_.deterministic))
    -
    -      if (hasNondeterministic) {
    +    case p @ Project(_, agg: Aggregate) =>
    +      if (hasNondeterministic(p.projectList, agg.aggregateExpressions)) {
             p
           } else {
    -        // Substitute any attributes that are produced by the child 
projection, so that we safely
    -        // eliminate it.
    -        // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 
'SELECT a + b + 1 ...'
    -        // TODO: Fix TransformBase to avoid the cast below.
    -        val substitutedProjection = projectList1.map(_.transform {
    -          case a: Attribute => aliasMap.getOrElse(a, a)
    -        }).asInstanceOf[Seq[NamedExpression]]
    -        // collapse 2 projects may introduce unnecessary Aliases, trim 
them here.
    -        val cleanedProjection = substitutedProjection.map(p =>
    -          
CleanupAliases.trimNonTopLevelAliases(p).asInstanceOf[NamedExpression]
    -        )
    -        agg.copy(aggregateExpressions = cleanedProjection)
    +        agg.copy(aggregateExpressions = buildCleanedProjectList(
    +          p.projectList, agg.aggregateExpressions))
           }
       }
    +
    +  private def buildCleanedProjectList(
    +      projectList1: Seq[NamedExpression],
    +      projectList2: Seq[NamedExpression]): Seq[NamedExpression] = {
    +    // Create a map of Aliases to their values from the child projection.
    +    // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c 
-> Alias(a + b, c)).
    +    val aliasMap = AttributeMap(projectList2.collect {
    +      case a: Alias => (a.toAttribute, a)
    +    })
    +
    +    // Substitute any attributes that are produced by the child 
projection, so that we safely
    +    // eliminate it.
    +    // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a 
+ b + 1 ...'
    +    val substitutedProjection = projectList1.map(_.transform {
    +      case a: Attribute => aliasMap.getOrElse(a, a)
    +    })
    +    // collapse 2 projects may introduce unnecessary Aliases, trim them 
here.
    +    substitutedProjection.map(p =>
    +      
CleanupAliases.trimNonTopLevelAliases(p).asInstanceOf[NamedExpression]
    +    )
    +  }
    +
    +  private def hasNondeterministic(
    +      projectList1: Seq[NamedExpression],
    +      projectList2: Seq[NamedExpression]): Boolean = {
    +    // Create a map of Aliases to their values from the child projection.
    +    // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c 
-> Alias(a + b, c)).
    +    val aliasMap = AttributeMap(projectList2.collect {
    +      case a: Alias => (a.toAttribute, a)
    +    })
    +
    +    // We only collapse these two Projects if their overlapped expressions 
are all
    +    // deterministic.
    +    projectList1.exists(_.collect {
    +      case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
    +    }.exists(!_.deterministic))
    +  }
    --- End diff --
    
    Further refactored your code a little bit:
    
    ```scala
    def collectAliases(projectList: Seq[NamedExpression]): AttributeMap[Alias] 
= {
      AttributeMap(projectList.collect {
        case a: Alias => a.toAttribute -> a
      })
    }
    
    def haveCommonNonDeterministicOutput(
        upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = {
      val aliases = collectAliases(lower)
      upper.exists(_.collect {
        case a: Attribute if aliases.contains(a) => aliases(a).child
      }).exists(!_.deterministic)
    }
    
    def buildCleanedProjectList(
        upper: Seq[NamedExpression],
        lower: Seq[NamedExpression]): Seq[NamedExpression] = {
      val aliases = collectAliases(lower)
    
      val rewrittenUpper = upper.map(_.transform {
        case a: Attribute => aliases.getOrElse(a, a)
      })
    
      rewrittenUpper.map { p =>
        CleanupAliases.trimNonTopLevelAliases(p).asInstanceOf[NamedExpression]
      }
    }
    ```
    
    And those inline comments need some rewording as they are now moved to 
different contexts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to