andylam-db commented on code in PR #43341:
URL: https://github.com/apache/spark/pull/43341#discussion_r1357469232


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -360,8 +360,40 @@ object PullupCorrelatedPredicates extends 
Rule[LogicalPlan] with PredicateHelper
     plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
       case ScalarSubquery(sub, children, exprId, conditions, hint, 
mayHaveCountBugOld)
         if children.nonEmpty =>
-        val (newPlan, newCond) = decorrelate(sub, plan)
-        val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) {
+
+        def mayHaveCountBugAgg(a: Aggregate): Boolean = {
+          a.groupingExpressions.isEmpty && 
a.aggregateExpressions.exists(_.exists {
+            case a: AggregateExpression => 
a.aggregateFunction.defaultResult.isDefined
+            case _ => false
+          })
+        }
+
+        // The below logic controls handling count bug for scalar subqueries in
+        // [[DecorrelateInnerQuery]], and if we don't handle it here, we 
handle it in
+        // [[RewriteCorrelatedScalarSubquery#constructLeftJoins]]. Note that 
handling it in
+        // [[DecorrelateInnerQuery]] is always correct, and turning it off to 
handle it in
+        // constructLeftJoins is an optimization, so that additional, 
redundant left outer joins are
+        // not introduced.
+        val handleCountBugInDecorrelate = !conf.getConf(
+          SQLConf.LEGACY_SCALAR_SUBQUERY_COUNT_BUG_HANDLING) && !(sub match {
+          // Handle count bug only if there exists lower level Aggs with count 
bugs. It does not
+          // matter if the top level agg is count bug vulnerable or not, 
because:
+          // 1. If the top level agg is count bug vulnerable, it can be 
handled in
+          // constructLeftJoins, unless there are lower aggs that are count 
bug vulnerable.
+          // E.g. COUNT(COUNT + COUNT)
+          // 2. If the top level agg is not count bug vulnerable, it can be 
count bug vulnerable if
+          // there are lower aggs that are count bug vulnerable. E.g. 
SUM(COUNT)
+          case agg: Aggregate => !agg.child.exists {
+            case lowerAgg: Aggregate => mayHaveCountBugAgg(lowerAgg)
+            case _ => false
+          }
+          case _ => false
+        })
+        val (newPlan, newCond) = decorrelate(sub, plan, 
handleCountBugInDecorrelate)
+        val mayHaveCountBug = if (handleCountBugInDecorrelate) {
+          // Count bug was already handled in the above decorrelate function 
call.
+          false
+        } else if (mayHaveCountBugOld.isEmpty) {

Review Comment:
   Maybe the issue is the definition of `mayHaveCountBug`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to