cloud-fan commented on code in PR #56357:
URL: https://github.com/apache/spark/pull/56357#discussion_r3396935930
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala:
##########
@@ -1481,16 +1517,39 @@ class NameScopeStack(
attribute
}
- groupingAttributeIds match {
- case Some(groupingAttributeIds) =>
- if (groupingAttributeIds.contains(attribute.exprId)) {
+ groupingModifier match {
+ case GroupingModifier.GroupBy(attributeIds) =>
+ if (attributeIds.contains(attribute.exprId)) {
attributeWithUpdatedNullability.markAsAllowAnyAccess()
} else {
attributeWithUpdatedNullability.markAsAggregatedAccessOnly()
}
- case None =>
+ case GroupingModifier.GroupingAnalytics(attributeIds) =>
+ if (attributeIds.contains(attribute.exprId)) {
+ attributeWithUpdatedNullability.markAsAllowAnyAccess()
+ } else {
+
stripIsDuplicateMetadata(attributeWithUpdatedNullability.markAsAggregatedAccessOnly())
Review Comment:
This strips `__is_duplicate` from every non-grouping hidden attribute, but
the tag has a second producer: `DeduplicateUnionChildOutput` also sets it, to
keep duplicate-named union child outputs unambiguous. Stripping those
collaterally could resurrect exactly the `AMBIGUOUS_REFERENCE` the tag exists
to prevent — inside aggregate expressions, where the fixed-point path keeps the
prune (`AttributeSeq.getCandidatesForResolution`). Could
`GroupingModifier.GroupingAnalytics` carry the Expand-passthrough ExprIds so
only those are stripped?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3088,16 +3088,64 @@ class Analyzer(
exprs: Seq[Expression],
agg: Aggregate): (Seq[NamedExpression], Seq[Expression]) = {
val extraAggExprs = new LinkedHashMap[Expression, NamedExpression]
+ val expandedToOriginalMap = buildExpandedToOriginalAttributeMap(agg)
val transformed = exprs.map { e =>
if (!e.resolved) {
e
} else {
- buildAggExprList(e, agg, extraAggExprs)
+ val rewritten = if (expandedToOriginalMap.nonEmpty) {
+ rewriteExpandedAttributesInAggregates(e, expandedToOriginalMap)
+ } else {
+ e
+ }
+ buildAggExprList(rewritten, agg, extraAggExprs)
}
}
(extraAggExprs.values().asScala.toSeq, transformed)
}
+ /**
+ * For GROUPING SETS / CUBE / ROLLUP, the plan below the Aggregate is
Review Comment:
The plan below the Aggregate can't include the Aggregate itself.
```suggestion
* For GROUPING SETS / CUBE / ROLLUP, the plan shape is
```
##########
sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/NameScopeSuite.scala:
##########
@@ -1064,6 +1065,44 @@ class NameScopeSuite extends SharedSparkSession {
assert(stack.current.output == Seq(col1Integer))
}
+ test("unaggregatedAccessOnlyIds excludes Expand-copy attributes under
aggregate expressions") {
+ val bExpand = AttributeReference(name = "b", dataType = IntegerType)()
+ val bOriginal = AttributeReference(name = "b", dataType = IntegerType)()
+
+ val stack = newNameScopeStack()
+
+ stack.overwriteCurrent(
+ output = Some(Seq(bExpand)),
+ hiddenOutput = Some(Seq(bOriginal))
+ )
+
+ val groupingAttributeIds = new HashSet[ExprId]
+ groupingAttributeIds.add(bExpand.exprId)
+ stack.overwriteOutputAndExtendHiddenOutput(
+ output = Seq(bExpand),
+ groupingModifier =
GroupingModifier.GroupingAnalytics(groupingAttributeIds)
+ )
+
+ val insideAggregate = stack.resolveMultipartName(
+ Seq("b"),
+ NameResolutionParameters(
+ canReferenceAggregatedAccessOnlyAttributes = true,
+ canResolveNameByHiddenOutput = true,
+ shouldPreferHiddenOutput = true
+ )
+ )
+ assert(insideAggregate.candidates.size == 1)
+ assert(
+ insideAggregate.candidates.head.asInstanceOf[AttributeReference].exprId
== bOriginal.exprId
+ )
+
+ val outsideAggregate = stack.resolveMultipartName(Seq("b"))
Review Comment:
This covers the main-output path, but not bare-reference-via-hidden-output
(`canResolveNameByHiddenOutput = true` with
`canReferenceAggregatedAccessOnlyAttributes = false`) — the path where the
stripped-`__is_duplicate` original relies solely on the `aggregatedAccessOnly`
filter to stay excluded. One more `resolveMultipartName` call here would cover
it.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3088,16 +3088,64 @@ class Analyzer(
exprs: Seq[Expression],
agg: Aggregate): (Seq[NamedExpression], Seq[Expression]) = {
val extraAggExprs = new LinkedHashMap[Expression, NamedExpression]
+ val expandedToOriginalMap = buildExpandedToOriginalAttributeMap(agg)
val transformed = exprs.map { e =>
if (!e.resolved) {
e
} else {
- buildAggExprList(e, agg, extraAggExprs)
+ val rewritten = if (expandedToOriginalMap.nonEmpty) {
+ rewriteExpandedAttributesInAggregates(e, expandedToOriginalMap)
+ } else {
+ e
+ }
+ buildAggExprList(rewritten, agg, extraAggExprs)
}
}
(extraAggExprs.values().asScala.toSeq, transformed)
}
+ /**
+ * For GROUPING SETS / CUBE / ROLLUP, the plan below the Aggregate is
+ * Aggregate -> Expand -> Project -> child. The Project introduces aliases
for the grouping
+ * columns, and the Expand produces new attributes that replace them. This
method builds a
+ * mapping from expanded attribute ExprIds back to the original
pre-expansion attributes, so
+ * that aggregate functions in HAVING / ORDER BY can reference the
original columns rather than
+ * the expanded ones (which would fail to resolve against the Aggregate's
child). Expression
Review Comment:
The expanded attributes do resolve against the Aggregate's child —
`Expand.output` contains them by construction. The actual problem is that they
are NULL for rolled-up groups, producing wrong aggregate values (as this PR's
own test header states).
```suggestion
* the expanded ones (which are NULL for rolled-up groups, yielding
wrong results). Expression
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala:
##########
@@ -768,6 +782,14 @@ class NameScope(
candidates
}
+ val filteredCandidates = if (canReferenceAggregatedAccessOnlyAttributes) {
Review Comment:
This filter runs whenever we're resolving inside an aggregate expression —
i.e. on every single-pass query today — even though `unaggregatedAccessOnlyIds`
is almost always empty. The adjacent extractable-candidates filter is guarded
by `nestedFields.nonEmpty`; adding `&& !unaggregatedAccessOnlyIds.isEmpty` here
avoids the per-resolution allocation on the common path.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateResolver.scala:
##########
@@ -169,9 +169,15 @@ class AggregateResolver(
scopes.popScope()
}
+ val groupingModifier = if
(operatorResolutionContextStack.current.hasGroupingAnalytics) {
Review Comment:
Question for the wiring follow-up: `hasGroupingAnalytics` is propagated
child-to-parent unconditionally on every `OperatorResolutionContextStack.pop()`
(OperatorResolutionContext.scala:142-148), including out of subquery contexts.
Once `GroupingAnalyticsResolver` is wired up, a plain GROUP BY aggregate above
a FROM-subquery (or scalar subquery) containing grouping analytics would take
the `GroupingAnalytics` branch with its own grouping attribute ids. A plain
GROUP BY has no Expand copy/original split, so excluding those ids inside
aggregate expressions removes the only candidates — `SUM(groupingCol)` in ORDER
BY/HAVING above it would fail to resolve. The fixed-point fix doesn't have this
problem: its map is built only from this aggregate's own `Expand` child. Would
it be safer to carry the grouping-analytics fact per-Aggregate (e.g. on
`AggregateResolutionResult`) instead of reading the propagated context flag?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]