bersprockets opened a new pull request #35537:
URL: https://github.com/apache/spark/pull/35537
### What changes were proposed in this pull request?
Change `PullOutGroupingExpressions` to eagerly iterate over
`groupingExpressions` when building `complexGroupingExpressionMap`.
### Why are the changes needed?
Consider this query:
```
Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2):
_*).sum("id").show(false)
```
It fails with
```
java.lang.IllegalStateException: Couldn't find _groupingexpression#24 in
[id#4,_groupingexpression#23]
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:425)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
at scala.collection.immutable.Stream.foreach(Stream.scala:534)
at scala.collection.TraversableOnce.count(TraversableOnce.scala:152)
at scala.collection.TraversableOnce.count$(TraversableOnce.scala:145)
at scala.collection.AbstractTraversable.count(Traversable.scala:108)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:293)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsumeWithKeys(HashAggregateExec.scala:623)
... etc ...
```
When `HashAggregateExec` attempts to bind the references in the group-by
expressions, attribute _groupingexpression#24 is missing from the child
`ProjectExec`'s output.
This is due to the way `PullOutGroupingExpressions`, when determining which
grouping expressions to shift from the `Aggregate` node to a `Project` node,
populates `complexGroupingExpressionMap`. `PullOutGroupingExpressions` uses a
map operation to iterate over `groupingExpressions` and updates
`complexGroupingExpressionMap` in the closure passed to `map()`. However, if
`groupingExpressions` is a `Stream`, the map operation is evaluated lazily, and
isn't fully completed until `ComputeCurrentTime` calls
`transformAllExpressionsWithPruning`, which is long after
`PullOutGroupingExpressions` completes. Therefore, at the time
`PullOutGroupingExpressions` is ready to create the `Project` node,
`complexGroupingExpressionMap` is not fully populated. As result, the `Project`
node is missing all but the first complex grouping expression.
### Does this PR introduce _any_ user-facing change?
No, other than the above query now works.
### How was this patch tested?
New unit test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]