andylam-db commented on code in PR #43341:
URL: https://github.com/apache/spark/pull/43341#discussion_r1357456302
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -360,8 +360,40 @@ object PullupCorrelatedPredicates extends
Rule[LogicalPlan] with PredicateHelper
plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case ScalarSubquery(sub, children, exprId, conditions, hint,
mayHaveCountBugOld)
if children.nonEmpty =>
- val (newPlan, newCond) = decorrelate(sub, plan)
- val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) {
+
+ def mayHaveCountBugAgg(a: Aggregate): Boolean = {
+ a.groupingExpressions.isEmpty &&
a.aggregateExpressions.exists(_.exists {
+ case a: AggregateExpression =>
a.aggregateFunction.defaultResult.isDefined
+ case _ => false
+ })
+ }
+
+ // The below logic controls handling count bug for scalar subqueries in
+ // [[DecorrelateInnerQuery]], and if we don't handle it here, we
handle it in
+ // [[RewriteCorrelatedScalarSubquery#constructLeftJoins]]. Note that
handling it in
+ // [[DecorrelateInnerQuery]] is always correct, and turning it off to
handle it in
+ // constructLeftJoins is an optimization, so that additional,
redundant left outer joins are
+ // not introduced.
+ val handleCountBugInDecorrelate = !conf.getConf(
+ SQLConf.LEGACY_SCALAR_SUBQUERY_COUNT_BUG_HANDLING) && !(sub match {
+ // Handle count bug only if there exists lower level Aggs with count
bugs. It does not
+ // matter if the top level agg is count bug vulnerable or not,
because:
+ // 1. If the top level agg is count bug vulnerable, it can be
handled in
+ // constructLeftJoins, unless there are lower aggs that are count
bug vulnerable.
+ // E.g. COUNT(COUNT + COUNT)
+ // 2. If the top level agg is not count bug vulnerable, it can be
count bug vulnerable if
+ // there are lower aggs that are count bug vulnerable. E.g.
SUM(COUNT)
+ case agg: Aggregate => !agg.child.exists {
+ case lowerAgg: Aggregate => mayHaveCountBugAgg(lowerAgg)
+ case _ => false
+ }
+ case _ => false
+ })
+ val (newPlan, newCond) = decorrelate(sub, plan,
handleCountBugInDecorrelate)
+ val mayHaveCountBug = if (handleCountBugInDecorrelate) {
+ // Count bug was already handled in the above decorrelate function
call.
+ false
+ } else if (mayHaveCountBugOld.isEmpty) {
Review Comment:
If the plan has already been decorrelated and count bugs handled in
`decorrelate()` above, then we can't set `mayHaveCountBug` parameter to be
true. This is because if we set it as true based on the pre-rewrite plan (which
is what's happening now), running `splitSubquery()` in `constructLeftJoins`
will fail because it encounters left outer joins (invalid operators).
Edit: but yeah, from the failing build it appears like i'm facing some
issues with idempotency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]