[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`
bersprockets commented on code in PR #37825: URL: https://github.com/apache/spark/pull/37825#discussion_r992871693 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala: ## @@ -254,7 +254,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Setup unique distinct aggregate children. val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct - val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair) + val distinctAggChildAttrMap = distinctAggChildren.map { e => +e.canonicalized -> AttributeReference(e.sql, e.dataType, nullable = true)() Review Comment: `expressionAttributePair` is used in two other places, though, for regular aggregate children and filter expressions where the key does not need to be canonicalized. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`
bersprockets commented on code in PR #37825: URL: https://github.com/apache/spark/pull/37825#discussion_r985123478 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala: ## @@ -402,7 +405,28 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate) } else { - a + // We may have one distinct group only because we grouped using ExpressionSet. + // To prevent SparkStrategies from complaining during sanity check, we need to check whether + // the original list of aggregate expressions had multiple distinct groups and, if so, + // patch that list so we have only one distinct group. Review Comment: >Shall we use ExpressionSet to fix issues in SparkStrategies as well? Looking... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`
bersprockets commented on code in PR #37825: URL: https://github.com/apache/spark/pull/37825#discussion_r980641159 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala: ## @@ -218,9 +218,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val aggExpressions = collectAggregateExprs(a) val distinctAggs = aggExpressions.filter(_.isDistinct) +val funcChildren = distinctAggs.flatMap { e => + e.aggregateFunction.children.filter(!_.foldable) +} +val funcChildrenLookup = funcChildren.map { e => + (e, funcChildren.find(fc => e.semanticEquals(fc)).getOrElse(e)) +}.toMap + // Extract distinct aggregate expressions. val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e => Review Comment: Not sure if this is what you were hinting at, but for all maps related to distinct aggregation children, the code now uses `ExpressionSet` as a key. That way look-ups shouldn't care about superficial differences: the code never makes a lookup using an original child (...for the distinct aggregations. It still uses original children for regular aggregations). >Then it's pretty easy to get back the original expressions, by ExpressionSet.toSeq. By using `ExpressionSet` as the key to `distinctAggChildAttrLookup`, hopefully I don't need the originals at all. Which is a good thing, since `ExpressionSet` is lossy when it comes to the originals, for example: ``` select count(distinct 1 + c1, c1 + 1), count(distinct c2 + 1, c2 + 2) from df; ``` This creates the following grouping keys for `distinctAggGroups`: ``` Set((1 + c1#106)) Set((c2#107 + 1), (c2#107 + 2)) ``` `c1#106 + 1` is lost because of the way `ExpressionSet#add` works (it just ignores a new expression that is semantically equivalent to anything in `baseSet`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`
bersprockets commented on code in PR #37825: URL: https://github.com/apache/spark/pull/37825#discussion_r979489753 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala: ## @@ -213,7 +213,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { case a: Aggregate if mayNeedtoRewrite(a) => rewrite(a) } - def rewrite(a: Aggregate): Aggregate = { + def rewrite(aOrig: Aggregate): Aggregate = { +// Make children of distinct aggregations the same if they are only +// different due to superficial reasons, e.g.: +// "1 + col1" vs "col1 + 1", both should become "1 + col1" +// or +// "col1" vs "Col1", both should become "col1" +// This could potentially reduce the number of distinct +// aggregate groups, and therefore reduce the number of +// projections in Expand (or eliminate the need for Expand) +val a = reduceDistinctAggregateGroups(aOrig) Review Comment: I made the change to use `ExpressionSet` and also commented on some of the issues. I still prefer 'sanitizing' each original function child to use the first semantically equivalent child, in essence creating a new set of "original" children, as it bypasses some complexities (in particular the one where we may lose some of the original children as keys when we group by `ExpressionSet`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`
bersprockets commented on code in PR #37825: URL: https://github.com/apache/spark/pull/37825#discussion_r979487585 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala: ## @@ -291,7 +298,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val naf = if (af.children.forall(_.foldable)) { af } else { - patchAggregateFunctionChildren(af) { x => + patchAggregateFunctionChildren(af) { x1 => +val x = funcChildrenLookup.getOrElse(x1, x1) Review Comment: Here's one of the complications, and my solution is somewhat brittle. When grouping by `ExpressionSet`, in the case where there are superficially different children, we don't get all of the original children in the keys of `distinctAggGroups`. This is because multiple `ExpressionSet`s may have the same baseSet but different originals, and `groupBy` chooses only one `ExpressionSet` to represent the group's key (which is what want: we want `groupBy` to group by semantically equivalent children). However, because `distinctAggGroups` is missing some original children in its keys, `distinctAggChildAttrLookup` is also missing some original children in its keys. To bridge this gap, I used `funcChildrenLookup`. This data structure maps each original function child to the first semantically equivalent original function child. `funcChildrenLookup` will translate the original function child into the key (hopefully) expected by `distinctAggChildAttrLookup`. The brittleness is this: this code depends, at the very least, on which `ExpressionSet` is chosen by `groupBy` as the winner. In the [first version](https://github.com/apache/spark/compare/master...bersprockets:spark:rewritedistinct_issue_orig?expand=1) of my PR, I modified the Aggregate (if needed) so there are no superfically different function children, thus there is no complexity when performing the groupings and the patching. I find it bit more straightforward to reason about. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala: ## @@ -402,7 +410,21 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate) } else { - a + // We may have one distinct group only because we grouped using ExpressionSet. + // To prevent SparkStrategies from complaining during sanity check, we need to check whether + // the original list of aggregate expressions had multiple distinct groups and, if so, + // patch that list so we have only one distinct group. + if (funcChildrenLookup.keySet.size > funcChildrenLookup.values.toSet.size) { +val patchedAggExpressions = a.aggregateExpressions.map { e => Review Comment: This is the second complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`
bersprockets commented on code in PR #37825: URL: https://github.com/apache/spark/pull/37825#discussion_r979267219 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala: ## @@ -213,7 +213,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { case a: Aggregate if mayNeedtoRewrite(a) => rewrite(a) } - def rewrite(a: Aggregate): Aggregate = { + def rewrite(aOrig: Aggregate): Aggregate = { +// Make children of distinct aggregations the same if they are only +// different due to superficial reasons, e.g.: +// "1 + col1" vs "col1 + 1", both should become "1 + col1" +// or +// "col1" vs "Col1", both should become "col1" +// This could potentially reduce the number of distinct +// aggregate groups, and therefore reduce the number of +// projections in Expand (or eliminate the need for Expand) +val a = reduceDistinctAggregateGroups(aOrig) Review Comment: Thanks! I am working on it, just working through some small complications. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`
bersprockets commented on code in PR #37825: URL: https://github.com/apache/spark/pull/37825#discussion_r967860756 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala: ## @@ -1451,6 +1451,22 @@ class DataFrameAggregateSuite extends QueryTest val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id") checkAnswer(df, Row(2, 3, 1)) } + + test("SPARK-40382: All distinct aggregation children are semantically equivalent") { Review Comment: This test succeeds without the changes to RewriteDistinctAggregates. It's just a sanity test to check that the grouping by semantic equivalence doesn't break this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org