xumingming commented on code in PR #56777:
URL: https://github.com/apache/spark/pull/56777#discussion_r3479337125
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala:
##########
@@ -127,11 +127,34 @@ object AggUtils {
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
- // Check if we can use HashAggregate.
+
+ val groupingAttributes = groupingExpressions.map(_.toAttribute)
+
+ // When partial aggregation is disabled, skip the pre-shuffle partial
aggregation and run a
+ // single Complete-mode aggregation after the shuffle. This can improve
performance when the
+ // group cardinality is high and the pre-shuffle reduction ratio is low.
+ //
+ // session_window requires MergingSessionsExec (inserted below via
mayAppendMergingSessionExec)
+ // to sort and merge overlapping sessions before the final aggregation.
The bypass is skipped
+ // when a session_window grouping key is present so that the normal
Partial+Merge+Final path
+ // runs and MergingSessionsExec is correctly inserted.
+ val hasSessionWindow =
groupingExpressions.exists(_.metadata.contains(SessionWindow.marker))
+ if (child.conf.bypassPartialAggregation && !hasSessionWindow) {
+ val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode
= Complete))
+ val completeAggregateAttributes =
completeAggregateExpressions.map(_.resultAttribute)
+ val completeAggregate = createAggregate(
+ requiredChildDistributionExpressions = Some(groupingAttributes),
Review Comment:
1. "requiredChildDistributionExpressions = Some(groupingAttributes)" is a
good catch, I will make the change. Under the hood, even if we don't make the
change, current code would produce the right result because the
PullOutGroupingExpressions rule extracts the non-attribute grouping expression
before the plan enters planAggregateWithoutDistinct. But the change you suggest
is indeed great to make the code more readable, reasonable, will make the
change.
2. For the nondeterministic concerns, PullOutNondeterministic pulls the
nondeterministic grouping expressions into a upstream Project, so it will not
be evaluated multiple times.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]