bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r979487585


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##########
@@ -291,7 +298,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
             val naf = if (af.children.forall(_.foldable)) {
               af
             } else {
-              patchAggregateFunctionChildren(af) { x =>
+              patchAggregateFunctionChildren(af) { x1 =>
+                val x = funcChildrenLookup.getOrElse(x1, x1)

Review Comment:
   Here's one of the complications, and my solution is somewhat brittle.
   
   When grouping by `ExpressionSet`, in the case where there are superficially 
different children, we don't get all of the original children in the keys of 
`distinctAggGroups`. This is because multiple `ExpressionSet`s may have the 
same baseSet but different originals, and `groupBy` chooses only one 
`ExpressionSet` to represent the group's key (which is what want: we want 
`groupBy` to group by semantically equivalent children).
   
   However, because `distinctAggGroups` is missing some original children in 
its keys, `distinctAggChildAttrLookup` is also missing some original children 
in its keys.
   
   To bridge this gap, I used `funcChildrenLookup`. This data structure maps 
each original function child to the first semantically equivalent original 
function child. `funcChildrenLookup` will translate the original function child 
into the key (hopefully) expected by `distinctAggChildAttrLookup`. The 
brittleness is this: this code depends, at the very least, on which 
`ExpressionSet` is chosen by `groupBy` as the winner.
   
   In the [first 
version](https://github.com/apache/spark/compare/master...bersprockets:spark:rewritedistinct_issue_orig?expand=1)
 of my PR, I modified the Aggregate (if needed) so there are no superfically 
different function children, thus there is no complexity when performing the 
groupings and the patching. I find it bit more straightforward to reason about.
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##########
@@ -402,7 +410,21 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] 
{
       }
       Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate)
     } else {
-      a
+      // We may have one distinct group only because we grouped using 
ExpressionSet.
+      // To prevent SparkStrategies from complaining during sanity check, we 
need to check whether
+      // the original list of aggregate expressions had multiple distinct 
groups and, if so,
+      // patch that list so we have only one distinct group.
+      if (funcChildrenLookup.keySet.size > 
funcChildrenLookup.values.toSet.size) {
+        val patchedAggExpressions = a.aggregateExpressions.map { e =>

Review Comment:
   This is the second complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to