andylam-db commented on code in PR #43341:
URL: https://github.com/apache/spark/pull/43341#discussion_r1355919290
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -360,8 +360,31 @@ object PullupCorrelatedPredicates extends
Rule[LogicalPlan] with PredicateHelper
plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case ScalarSubquery(sub, children, exprId, conditions, hint,
mayHaveCountBugOld)
if children.nonEmpty =>
- val (newPlan, newCond) = decorrelate(sub, plan)
- val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) {
+
+ def mayHaveCountBugAgg(a: Aggregate): Boolean = {
+ a.groupingExpressions.isEmpty &&
a.aggregateExpressions.exists(_.exists {
+ case a: AggregateExpression =>
a.aggregateFunction.defaultResult.isDefined
+ case _ => false
+ })
+ }
+
+ // We want to handle count bug for scalar subqueries, except for the
cases where the
+ // subquery is a simple top level Aggregate which can have a count bug
(note: the below
+ // logic also takes into account nested COUNTs). This is because for
these cases, we don't
+ // want to introduce redundant left outer joins in
[[DecorrelateInnerQuery]], when the
+ // necessary left outer join will be added in
[[RewriteCorrelatedScalarSubquery]].
+ val shouldHandleCountBug = !(sub match {
Review Comment:
Should we flag this with a config?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]