Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14452#discussion_r94001153
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1167,3 +1173,211 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
           a.copy(groupingExpressions = newGrouping)
       }
     }
    +
    +/**
    + * Optimizes the logical plans wrapped in SubqueryAlias and operators on 
them.
    + * The SubqueryAlias which are remaining in optimization phase are common 
subqueries,
    + * i.e., they are duplicate in the whole query plan. The logical plans 
wrapped in
    + * SubqueryAlias will be executed individually later. However, some 
operators such as
    + * Project and Filter can be optimized with the wrapped logical plans. 
Thus, this rule
    + * considers the optimization of the wrapped logical plans and operators 
on SubqueryAlias.
    + */
    +case class OptimizeCommonSubqueries(optimizer: Optimizer)
    +    extends Rule[LogicalPlan] with PredicateHelper {
    +  // Optimized the subqueries which all have a Project parent node and the 
same results.
    +  private def optimizeProjectWithSubqueries(
    +      plan: LogicalPlan,
    +      keyPlan: LogicalPlan,
    +      subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
    +    plan transform {
    +      case p @ Project(pList, s @ SubqueryAlias(alias, subquery, v, true))
    +          if s.sameResult(keyPlan) =>
    +        val pListForAll: Seq[NamedExpression] = subqueries.flatMap { case 
Project(pList, child) =>
    +          val rewrites = buildRewrites(child, subquery)
    +          pList.map(pushToOtherPlan(_, rewrites))
    +        }
    +
    +        val newSubquery = Project(pListForAll, subquery)
    +        val optimized = optimizer.execute(newSubquery)
    +        // Check if any optimization is performed.
    +        if (optimized.sameResult(newSubquery)) {
    +          // No optimization happens. Let's keep original subquery.
    +          p
    +        } else {
    +          Project(pList.map(_.toAttribute), SubqueryAlias(alias, 
newSubquery, v, true))
    +        }
    +    }
    +  }
    +
    +  /**
    +   * Maps Attributes from the source side to the corresponding Attribute 
on the target side.
    +   */
    +  private def buildRewrites(source: LogicalPlan, target: LogicalPlan): 
AttributeMap[Attribute] = {
    +    assert(source.output.size == target.output.size)
    +    AttributeMap(source.output.zip(target.output))
    +  }
    +
    +  /**
    +   * Rewrites an expression so that it can be pushed to another 
LogicalPlan.
    +   */
    +  private def pushToOtherPlan[A <: Expression](e: A, rewrites: 
AttributeMap[Attribute]) = {
    +    val result = e transformUp {
    +      case a: Attribute => rewrites.get(a).getOrElse(a)
    +    }
    +
    +    // We must promise the compiler that we did not discard the names in 
the case of project
    +    // expressions.  This is safe since the only transformation is from 
Attribute => Attribute.
    +    result.asInstanceOf[A]
    +  }
    +
    +  private def optimizeFilterWithSubqueries(
    +      plan: LogicalPlan,
    +      keyPlan: LogicalPlan,
    +      subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
    +    var pushdownConds = 
splitConjunctivePredicates(subqueries(0).asInstanceOf[Filter].condition)
    +    subqueries.tail.foreach {
    +      case Filter(otherCond, child) =>
    +        val rewrites = buildRewrites(child, 
subqueries(0).asInstanceOf[Filter].child)
    +        // We can't simply push down all conditions from other Filter by 
concatenating them with
    --- End diff --
    
    This part has been extracted out as #15558 and can be removed if that PR is 
merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to