bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r979487585
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##########
@@ -291,7 +298,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
val naf = if (af.children.forall(_.foldable)) {
af
} else {
- patchAggregateFunctionChildren(af) { x =>
+ patchAggregateFunctionChildren(af) { x1 =>
+ val x = funcChildrenLookup.getOrElse(x1, x1)
Review Comment:
Here's one of the complications, and my solution is somewhat brittle.
When grouping by `ExpressionSet`, in the case where there are superficially
different children, we don't get all of the original children in the keys of
`distinctAggGroups`. This is because multiple `ExpressionSet`s may have the
same baseSet but different originals, and `groupBy` chooses only one
`ExpressionSet` to represent the group's key (which is what want: we want
`groupBy` to group by semantically equivalent children).
However, because `distinctAggGroups` is missing some original children in
its keys, `distinctAggChildAttrLookup` is also missing some original children
in its keys.
To bridge this gap, I used `funcChildrenLookup`. This data structure maps
each original function child to the first semantically equivalent original
function child. `funcChildrenLookup` will translate the original function child
into the key (hopefully) expected by `distinctAggChildAttrLookup`. The
brittleness is this: this code depends, at the very least, on which
`ExpressionSet` is chosen by `groupBy` as the winner.
In the [first
version](https://github.com/apache/spark/compare/master...bersprockets:spark:rewritedistinct_issue_orig?expand=1)
of my PR, I modified the Aggregate (if needed) so there are no superfically
different function children, thus there is no complexity when performing the
groupings and the patching. I find it bit more straightforward to reason about.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##########
@@ -402,7 +410,21 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan]
{
}
Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate)
} else {
- a
+ // We may have one distinct group only because we grouped using
ExpressionSet.
+ // To prevent SparkStrategies from complaining during sanity check, we
need to check whether
+ // the original list of aggregate expressions had multiple distinct
groups and, if so,
+ // patch that list so we have only one distinct group.
+ if (funcChildrenLookup.keySet.size >
funcChildrenLookup.values.toSet.size) {
+ val patchedAggExpressions = a.aggregateExpressions.map { e =>
Review Comment:
This is the second complexity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]