jchen5 commented on code in PR #43341:
URL: https://github.com/apache/spark/pull/43341#discussion_r1361237972
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -360,8 +360,40 @@ object PullupCorrelatedPredicates extends
Rule[LogicalPlan] with PredicateHelper
plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case ScalarSubquery(sub, children, exprId, conditions, hint,
mayHaveCountBugOld)
if children.nonEmpty =>
- val (newPlan, newCond) = decorrelate(sub, plan)
- val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) {
+
+ def mayHaveCountBugAgg(a: Aggregate): Boolean = {
+ a.groupingExpressions.isEmpty &&
a.aggregateExpressions.exists(_.exists {
+ case a: AggregateExpression =>
a.aggregateFunction.defaultResult.isDefined
+ case _ => false
+ })
+ }
+
+ // The below logic controls handling count bug for scalar subqueries in
+ // [[DecorrelateInnerQuery]], and if we don't handle it here, we
handle it in
+ // [[RewriteCorrelatedScalarSubquery#constructLeftJoins]]. Note that
handling it in
+ // [[DecorrelateInnerQuery]] is always correct, and turning it off to
handle it in
+ // constructLeftJoins is an optimization, so that additional,
redundant left outer joins are
+ // not introduced.
+ val handleCountBugInDecorrelate = !conf.getConf(
+ SQLConf.LEGACY_SCALAR_SUBQUERY_COUNT_BUG_HANDLING) && !(sub match {
+ // Handle count bug only if there exists lower level Aggs with count
bugs. It does not
+ // matter if the top level agg is count bug vulnerable or not,
because:
+ // 1. If the top level agg is count bug vulnerable, it can be
handled in
+ // constructLeftJoins, unless there are lower aggs that are count
bug vulnerable.
+ // E.g. COUNT(COUNT + COUNT)
+ // 2. If the top level agg is not count bug vulnerable, it can be
count bug vulnerable if
+ // there are lower aggs that are count bug vulnerable. E.g.
SUM(COUNT)
+ case agg: Aggregate => !agg.child.exists {
+ case lowerAgg: Aggregate => mayHaveCountBugAgg(lowerAgg)
+ case _ => false
+ }
+ case _ => false
+ })
+ val (newPlan, newCond) = decorrelate(sub, plan,
handleCountBugInDecorrelate)
+ val mayHaveCountBug = if (handleCountBugInDecorrelate) {
+ // Count bug was already handled in the above decorrelate function
call.
+ false
+ } else if (mayHaveCountBugOld.isEmpty) {
Review Comment:
Ah I see the confusion. What I meant was move the check for the else case
(where mayHaveCountBugOld is non-empty) first - in that case we just copy the
old mayHaveCountBug value over. I think that shouldn't cause any functional
difference?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]