andylam-db commented on code in PR #43341:
URL: https://github.com/apache/spark/pull/43341#discussion_r1355919290


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -360,8 +360,31 @@ object PullupCorrelatedPredicates extends 
Rule[LogicalPlan] with PredicateHelper
     plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
       case ScalarSubquery(sub, children, exprId, conditions, hint, 
mayHaveCountBugOld)
         if children.nonEmpty =>
-        val (newPlan, newCond) = decorrelate(sub, plan)
-        val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) {
+
+        def mayHaveCountBugAgg(a: Aggregate): Boolean = {
+          a.groupingExpressions.isEmpty && 
a.aggregateExpressions.exists(_.exists {
+            case a: AggregateExpression => 
a.aggregateFunction.defaultResult.isDefined
+            case _ => false
+          })
+        }
+
+        // We want to handle count bug for scalar subqueries, except for the 
cases where the
+        // subquery is a simple top level Aggregate which can have a count bug 
(note: the below
+        // logic also takes into account nested COUNTs). This is because for 
these cases, we don't
+        // want to introduce redundant left outer joins in 
[[DecorrelateInnerQuery]], when the
+        // necessary left outer join will be added in 
[[RewriteCorrelatedScalarSubquery]].
+        val shouldHandleCountBug = !(sub match {

Review Comment:
   Should we flag this with a config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to