cloud-fan commented on a change in pull request #33070:
URL: https://github.com/apache/spark/pull/33070#discussion_r666405208
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
##########
@@ -428,7 +451,132 @@ object DecorrelateInnerQuery extends PredicateHelper {
groupingExpressions = newGroupingExpr ++ referencesToAdd,
aggregateExpressions = newAggExpr ++ referencesToAdd,
child = newChild)
- (newAggregate, joinCond, outerReferenceMap)
+
+ // Preserving domain attributes over an Aggregate with an empty
grouping expression
+ // is subject to the "COUNT bug" that can lead to wrong answer:
+ //
+ // Suppose the original query is:
+ // SELECT a, (SELECT COUNT(*) cnt FROM t2 WHERE t1.a = t2.c)
FROM t1
+ //
+ // Decorrelated plan:
+ // Project [a, scalar-subquery [a = c]]
+ // : +- Aggregate [c] [count(*) AS cnt, c]
+ // : +- Relation [c, d]
+ // +- Relation [a, b]
+ //
+ // After rewrite:
+ // Project [a, cnt]
+ // +- Join LeftOuter (a = c)
+ // :- Relation [a, b]
+ // +- Aggregate [c] [count(*) AS cnt, c]
+ // +- Relation [c, d]
+ //
+ // T1 T2 T2' (GROUP BY c)
+ // +---+---+ +---+---+ +---+-----+
+ // | a | b | | c | d | | c | cnt |
+ // +---+---+ +---+---+ +---+-----+
+ // | 0 | 1 | | 0 | 2 | | 0 | 2 |
+ // | 1 | 2 | | 0 | 3 | +---+-----+
+ // +---+---+ +---+---+
+ //
+ // T1 nested loop join T2 T1 left outer join T2'
+ // on (a = c): on (a = c):
+ // +---+-----+ +---+-----++
+ // | a | cnt | | a | cnt |
+ // +---+-----+ +---+------+
+ // | 0 | 2 | | 0 | 2 |
+ // | 1 | 0 | <--- correct | 1 | null | <--- wrong result
+ // +---+-----+ +---+------+
+ //
+ // If an aggregate is subject to the COUNT bug:
+ // 1) add a column `true AS alwaysTrue` to the result of the
aggregate
+ // 2) insert a left outer domain join between the outer query and
this aggregate
+ // 3) rewrite the original aggregate's output column using the
default value of the
+ // aggregate function and the alwaysTrue column.
+ //
+ // For example, T1 left outer join T2' with `alwaysTrue` marker:
+ // +---+------+------------+--------------------------------+
+ // | c | cnt | alwaysTrue | if(isnull(alwaysTrue), 0, cnt) |
+ // +---+------+------------+--------------------------------+
+ // | 0 | 2 | true | 2 |
+ // | 0 | null | null | 0 |
<--- correct result
+ // +---+------+------------+--------------------------------+
+ if (groupingExpressions.isEmpty && handleCountBug) {
+ // Evaluate the aggregate expressions with zero tuples.
+ val resultMap =
RewriteCorrelatedScalarSubquery.evalAggregateOnZeroTups(newAggregate)
+ val alwaysTrue = Alias(Literal.TrueLiteral, "alwaysTrue")()
+ val alwaysTrueRef = alwaysTrue.toAttribute.withNullability(true)
+ val expressions = ArrayBuffer.empty[NamedExpression]
+ // Create new aliases for aggregate expressions that have
non-null default
+ // values and reconstruct the output with the `alwaysTrue`
marker.
+ val projectList = newAggregate.aggregateExpressions.map { a =>
+ resultMap.get(a.exprId) match {
+ // Aggregate expression is not subject to the count bug.
+ case Some(Literal(null, _)) | None =>
+ expressions += a
+ // The attribute is nullable since it is from the
right-hand side of a
+ // left outer join.
+ a.toAttribute.withNullability(true)
+ case Some(default) =>
+ val newAttr = a.newInstance()
Review comment:
what if we don't call `.newInstance()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]