cloud-fan commented on code in PR #49202:
URL: https://github.com/apache/spark/pull/49202#discussion_r1897208517
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1978,6 +1988,62 @@ object PushPredicateThroughNonJoin extends
Rule[LogicalPlan] with PredicateHelpe
case _ => false
}
}
+
+ /**
+ * Use [[With]] to rewrite condition which contains attribute that are not
cheap and be consumed
+ * multiple times. Each predicate generates one or 0 With. For facilitates
subsequent merge
+ * [[With]], use the same CommonExpressionDef ids for different [[With]].
+ */
+ private def rewriteConditionByWith(
+ cond: Seq[Expression],
+ aliasMap: AttributeMap[Alias]): Seq[Expression] = {
+ if (!SQLConf.get.getConf(SQLConf.ALWAYS_INLINE_COMMON_EXPR)) {
+ val canRewriteConf = cond.filter(canRewriteByWith)
+ if (canRewriteConf.nonEmpty) {
+ val replaceWithMap = canRewriteConf.reduce(And)
+ .collect { case a: Attribute => a }
+ .groupBy(identity)
+ .transform((_, v) => v.size)
+ .filter(m => aliasMap.contains(m._1) && m._2 > 1)
+ .map(m => m._1 -> trimAliases(aliasMap.getOrElse(m._1, m._1)))
+ .filter(m => !CollapseProject.isCheap(m._2))
+ val defsMap = AttributeMap(replaceWithMap.map(m => m._1 ->
CommonExpressionDef(m._2)))
+ val refsMap = AttributeMap(defsMap.map(m => m._1 -> new
CommonExpressionRef(m._2)))
+ cond.map(rewriteByWith(_, defsMap, refsMap))
+ } else cond
+ } else cond
+ }
+
+ private def rewriteConditionByWith(
+ cond: Expression,
+ aliasMap: AttributeMap[Alias]): Expression = {
+ if (!SQLConf.get.getConf(SQLConf.ALWAYS_INLINE_COMMON_EXPR)) {
+ rewriteConditionByWith(splitConjunctivePredicates(cond),
aliasMap).reduce(And)
+ } else cond
+ }
+
+ // With does not support inline subquery
+ private def canRewriteByWith(expr: Expression): Boolean = {
+ !expr.containsPattern(PLAN_EXPRESSION)
+ }
+
+ private def rewriteByWith(
+ expr: Expression,
+ defsMap: AttributeMap[CommonExpressionDef],
+ refsMap: AttributeMap[CommonExpressionRef]): Expression = {
+ if (!canRewriteByWith(expr)) {
+ return expr
+ }
+ val defs = mutable.HashSet.empty[CommonExpressionDef]
+ val replaced = expr.transform {
+ case a: Attribute if refsMap.contains(a) =>
+ defs.add(defsMap.get(a).get)
Review Comment:
```suggestion
defs.add(defsMap(a))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]