[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`

2022-10-11 Thread GitBox


bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r992871693


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##
@@ -254,7 +254,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
 
   // Setup unique distinct aggregate children.
   val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct
-  val distinctAggChildAttrMap = 
distinctAggChildren.map(expressionAttributePair)
+  val distinctAggChildAttrMap = distinctAggChildren.map { e =>
+e.canonicalized -> AttributeReference(e.sql, e.dataType, nullable = 
true)()

Review Comment:
   `expressionAttributePair` is used in two other places, though, for regular 
aggregate children and filter expressions where the key does not need to be 
canonicalized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`

2022-10-01 Thread GitBox


bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r985123478


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##
@@ -402,7 +405,28 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] 
{
   }
   Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate)
 } else {
-  a
+  // We may have one distinct group only because we grouped using 
ExpressionSet.
+  // To prevent SparkStrategies from complaining during sanity check, we 
need to check whether
+  // the original list of aggregate expressions had multiple distinct 
groups and, if so,
+  // patch that list so we have only one distinct group.

Review Comment:
   >Shall we use ExpressionSet to fix issues in SparkStrategies as well?
   
   Looking...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`

2022-09-26 Thread GitBox


bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r980641159


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##
@@ -218,9 +218,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] 
{
 val aggExpressions = collectAggregateExprs(a)
 val distinctAggs = aggExpressions.filter(_.isDistinct)
 
+val funcChildren = distinctAggs.flatMap { e =>
+  e.aggregateFunction.children.filter(!_.foldable)
+}
+val funcChildrenLookup = funcChildren.map { e =>
+  (e, funcChildren.find(fc => e.semanticEquals(fc)).getOrElse(e))
+}.toMap
+
 // Extract distinct aggregate expressions.
 val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>

Review Comment:
   Not sure if this is what you were hinting at, but for all maps related to 
distinct aggregation children, the code now uses `ExpressionSet` as a key. That 
way look-ups shouldn't care about superficial differences: the code never makes 
a lookup using an original child (...for the distinct aggregations. It still 
uses original children for regular aggregations).
   
   >Then it's pretty easy to get back the original expressions, by 
ExpressionSet.toSeq.
   
   By using `ExpressionSet` as the key to `distinctAggChildAttrLookup`, 
hopefully I don't need the originals at all.
   
   Which is a good thing, since `ExpressionSet` is lossy when it comes to the 
originals, for example:
   
   ```
   select count(distinct 1 + c1, c1 + 1), count(distinct c2 + 1, c2 + 2) from 
df;
   ```
   This creates the following grouping keys for `distinctAggGroups`:
   ```
   Set((1 + c1#106))
   Set((c2#107 + 1), (c2#107 + 2))
   ```
   `c1#106 + 1` is lost because of the way `ExpressionSet#add` works (it just 
ignores a new expression that is semantically equivalent to anything in 
`baseSet`).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`

2022-09-25 Thread GitBox


bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r979489753


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##
@@ -213,7 +213,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] 
{
 case a: Aggregate if mayNeedtoRewrite(a) => rewrite(a)
   }
 
-  def rewrite(a: Aggregate): Aggregate = {
+  def rewrite(aOrig: Aggregate): Aggregate = {
+// Make children of distinct aggregations the same if they are only
+// different due to superficial reasons, e.g.:
+//   "1 + col1" vs "col1 + 1", both should become "1 + col1"
+// or
+//   "col1" vs "Col1", both should become "col1"
+// This could potentially reduce the number of distinct
+// aggregate groups, and therefore reduce the number of
+// projections in Expand (or eliminate the need for Expand)
+val a = reduceDistinctAggregateGroups(aOrig)

Review Comment:
   I made the change to use `ExpressionSet` and also commented on some of the 
issues.
   
   I still prefer 'sanitizing' each original function child to use the first 
semantically equivalent child, in essence creating a new set of "original" 
children, as it bypasses some complexities (in particular the one where we may 
lose some of the original children as keys when we group by `ExpressionSet`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`

2022-09-25 Thread GitBox


bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r979487585


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##
@@ -291,7 +298,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
 val naf = if (af.children.forall(_.foldable)) {
   af
 } else {
-  patchAggregateFunctionChildren(af) { x =>
+  patchAggregateFunctionChildren(af) { x1 =>
+val x = funcChildrenLookup.getOrElse(x1, x1)

Review Comment:
   Here's one of the complications, and my solution is somewhat brittle.
   
   When grouping by `ExpressionSet`, in the case where there are superficially 
different children, we don't get all of the original children in the keys of 
`distinctAggGroups`. This is because multiple `ExpressionSet`s may have the 
same baseSet but different originals, and `groupBy` chooses only one 
`ExpressionSet` to represent the group's key (which is what want: we want 
`groupBy` to group by semantically equivalent children).
   
   However, because `distinctAggGroups` is missing some original children in 
its keys, `distinctAggChildAttrLookup` is also missing some original children 
in its keys.
   
   To bridge this gap, I used `funcChildrenLookup`. This data structure maps 
each original function child to the first semantically equivalent original 
function child. `funcChildrenLookup` will translate the original function child 
into the key (hopefully) expected by `distinctAggChildAttrLookup`. The 
brittleness is this: this code depends, at the very least, on which 
`ExpressionSet` is chosen by `groupBy` as the winner.
   
   In the [first 
version](https://github.com/apache/spark/compare/master...bersprockets:spark:rewritedistinct_issue_orig?expand=1)
 of my PR, I modified the Aggregate (if needed) so there are no superfically 
different function children, thus there is no complexity when performing the 
groupings and the patching. I find it bit more straightforward to reason about.
   



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##
@@ -402,7 +410,21 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] 
{
   }
   Aggregate(groupByAttrs, patchedAggExpressions, firstAggregate)
 } else {
-  a
+  // We may have one distinct group only because we grouped using 
ExpressionSet.
+  // To prevent SparkStrategies from complaining during sanity check, we 
need to check whether
+  // the original list of aggregate expressions had multiple distinct 
groups and, if so,
+  // patch that list so we have only one distinct group.
+  if (funcChildrenLookup.keySet.size > 
funcChildrenLookup.values.toSet.size) {
+val patchedAggExpressions = a.aggregateExpressions.map { e =>

Review Comment:
   This is the second complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`

2022-09-24 Thread GitBox


bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r979267219


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala:
##
@@ -213,7 +213,16 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] 
{
 case a: Aggregate if mayNeedtoRewrite(a) => rewrite(a)
   }
 
-  def rewrite(a: Aggregate): Aggregate = {
+  def rewrite(aOrig: Aggregate): Aggregate = {
+// Make children of distinct aggregations the same if they are only
+// different due to superficial reasons, e.g.:
+//   "1 + col1" vs "col1 + 1", both should become "1 + col1"
+// or
+//   "col1" vs "Col1", both should become "col1"
+// This could potentially reduce the number of distinct
+// aggregate groups, and therefore reduce the number of
+// projections in Expand (or eliminate the need for Expand)
+val a = reduceDistinctAggregateGroups(aOrig)

Review Comment:
   Thanks! I am working on it, just working through some small complications.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bersprockets commented on a diff in pull request #37825: [SPARK-40382][SQL] Group distinct aggregate expressions by semantically equivalent children in `RewriteDistinctAggregates`

2022-09-11 Thread GitBox


bersprockets commented on code in PR #37825:
URL: https://github.com/apache/spark/pull/37825#discussion_r967860756


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala:
##
@@ -1451,6 +1451,22 @@ class DataFrameAggregateSuite extends QueryTest
 val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): 
_*).sum("id")
 checkAnswer(df, Row(2, 3, 1))
   }
+
+  test("SPARK-40382: All distinct aggregation children are semantically 
equivalent") {

Review Comment:
   This test succeeds without the changes to RewriteDistinctAggregates. It's 
just a sanity test to check that the grouping by semantic equivalence doesn't 
break this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org