zml1206 commented on code in PR #49202:
URL: https://github.com/apache/spark/pull/49202#discussion_r1897364136


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1978,6 +1988,62 @@ object PushPredicateThroughNonJoin extends 
Rule[LogicalPlan] with PredicateHelpe
       case _ => false
     }
   }
+
+  /**
+   * Use [[With]] to rewrite condition which contains attribute that are not 
cheap and be consumed
+   * multiple times. Each predicate generates one or 0 With. For facilitates 
subsequent merge
+   * [[With]], use the same CommonExpressionDef ids for different [[With]].
+   */
+  private def rewriteConditionByWith(
+      cond: Seq[Expression],
+      aliasMap: AttributeMap[Alias]): Seq[Expression] = {
+    if (!SQLConf.get.getConf(SQLConf.ALWAYS_INLINE_COMMON_EXPR)) {
+      val canRewriteConf = cond.filter(canRewriteByWith)
+      if (canRewriteConf.nonEmpty) {
+        val replaceWithMap = canRewriteConf.reduce(And)
+          .collect { case a: Attribute => a }
+          .groupBy(identity)
+          .transform((_, v) => v.size)
+          .filter(m => aliasMap.contains(m._1) && m._2 > 1)
+          .map(m => m._1 -> trimAliases(aliasMap.getOrElse(m._1, m._1)))
+          .filter(m => !CollapseProject.isCheap(m._2))
+        val defsMap = AttributeMap(replaceWithMap.map(m => m._1 -> 
CommonExpressionDef(m._2)))
+        val refsMap = AttributeMap(defsMap.map(m => m._1 -> new 
CommonExpressionRef(m._2)))
+        cond.map(rewriteByWith(_, defsMap, refsMap))
+      } else cond
+    } else cond
+  }
+
+  private def rewriteConditionByWith(
+      cond: Expression,
+      aliasMap: AttributeMap[Alias]): Expression = {
+    if (!SQLConf.get.getConf(SQLConf.ALWAYS_INLINE_COMMON_EXPR)) {
+      rewriteConditionByWith(splitConjunctivePredicates(cond), 
aliasMap).reduce(And)
+    } else cond
+  }
+
+  // With does not support inline subquery
+  private def canRewriteByWith(expr: Expression): Boolean = {
+    !expr.containsPattern(PLAN_EXPRESSION)

Review Comment:
   Both rewriting subqueries and pushing down predicates are in batch "operator 
optimization before inferring filters". Pushing down predicates may cause 
SubqueryExpression to contain common refs, and then rewriting subqueries cannot 
replace common refs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to