agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513262881


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
       // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+        if 
conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG)
 &&
+          mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the 
subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main 
plan and all
+        // optimization rules are applied again.
+        val groupRefs = group.flatMap(x => x.references)
+        val projectOverAggregateChild = Project(groupRefs ++ 
a.references.toSeq, child)
+        val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression(
+          s.withNewPlan(projectOverAggregateChild)))
+        assert(optimizedPlan.isInstanceOf[Subquery])
+        val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child
+
+        assert(optimizedInput.output.size == 
projectOverAggregateChild.output.size)
+        val updatedProjectList = 
projectOverAggregateChild.output.zip(optimizedInput.output).map {
+          case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = 
oldAttr.exprId)

Review Comment:
   We are preserving the Aggregate, and optimizing its input via 
Optimizer.execute(). This project is meant to ensure that the Aggregate still 
gets the attributes with expected IDs. The "normal" subquery optimization path 
does not care about aggregates and therefore does not need it.
   I added a comment to clarify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to