cloud-fan commented on code in PR #46034:
URL: https://github.com/apache/spark/pull/46034#discussion_r1565727826


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -21,36 +21,57 @@ import scala.collection.mutable
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PlanHelper, 
Project}
+import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, 
PlanHelper, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, 
WITH_EXPRESSION}
 
 /**
  * Rewrites the `With` expressions by adding a `Project` to pre-evaluate the 
common expressions, or
  * just inline them if they are cheap.
  *
+ * Since this rule can introduce new `Project` operators, it is advised to run 
[[CollapseProject]]
+ * after this rule.
+ *
  * Note: For now we only use `With` in a few `RuntimeReplaceable` expressions. 
If we expand its
  *       usage, we should support aggregate/window functions as well.
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {
+    
plan.transformUpWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {
+      case p @ PhysicalAggregation(
+          groupingExpressions, aggregateExpressions, resultExpressions, child)
+          if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
+        // For aggregates, separate computation of the aggregations themselves 
from the final
+        // result by moving the final result computation into a projection 
above. This prevents
+        // this rule from producing an invalid Aggregate operator.
+        // TODO: the names of these aliases will become outdated after the 
rewrite
+        val aggExprs = aggregateExpressions.map(ae => Alias(ae, 
ae.toString)(ae.resultId))
+        // Rewrite the projection and the aggregate separately and then piece 
them together.
+        val agg = Aggregate(groupingExpressions, groupingExpressions ++ 
aggExprs, child)
+        val rewrittenAgg = applyInternal(agg)
+        val proj = Project(resultExpressions, rewrittenAgg)
+        applyInternal(proj)
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        val inputPlans = p.children.toArray
-        var newPlan: LogicalPlan = p.mapExpressions { expr =>
-          rewriteWithExprAndInputPlans(expr, inputPlans)
-        }
-        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
-        // Since we add extra Projects with extra columns to pre-evaluate the 
common expressions,
-        // the current operator may have extra columns if it inherits the 
output columns from its
-        // child, and we need to project away the extra columns to keep the 
plan schema unchanged.
-        assert(p.output.length <= newPlan.output.length)
-        if (p.output.length < newPlan.output.length) {
-          assert(p.outputSet.subsetOf(newPlan.outputSet))
-          Project(p.output, newPlan)
-        } else {
-          newPlan
-        }
+        applyInternal(p)
+    }
+  }
+
+  private def applyInternal(p: LogicalPlan): LogicalPlan = {
+    val inputPlans = p.children.toArray
+    var newPlan: LogicalPlan = p.mapExpressions { expr =>
+      rewriteWithExprAndInputPlans(expr, inputPlans)
+    }
+    newPlan = newPlan.withNewChildren(inputPlans)

Review Comment:
   the children do not change, why calling `withNewChildren`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to