peter-toth commented on a change in pull request #31913:
URL: https://github.com/apache/spark/pull/31913#discussion_r611788588
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -655,12 +666,76 @@ case class Aggregate(
}
}
+ private def expandGroupingReferences(e: Expression): Expression = {
+ e match {
+ case _: AggregateExpression => e
+ case _ if PythonUDF.isGroupedAggPandasUDF(e) => e
+ case g: GroupingExprRef => groupingExpressions(g.ordinal)
+ case _ => e.mapChildren(expandGroupingReferences)
+ }
+ }
+
+ lazy val aggregateExpressionsWithoutGroupingRefs = {
+
aggregateExpressions.map(expandGroupingReferences(_).asInstanceOf[NamedExpression])
+ }
+
override lazy val validConstraints: ExpressionSet = {
val nonAgg =
aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty)
getAllValidConstraints(nonAgg)
}
}
+object Aggregate {
+ private def collectComplexGroupingExpressions(groupingExpressions:
Seq[Expression]) = {
+ groupingExpressions.zipWithIndex
+ .foldLeft(mutable.Map.empty[Expression, (Expression, Int)]) {
+ case (m, (ge, i)) =>
+ if (ge.deterministic && !ge.foldable && ge.children.nonEmpty &&
Review comment:
Indeed, `PullOutNondeterministic` is taking care of them. Fixed in
https://github.com/apache/spark/pull/31913/commits/06224449062e2f1e97a419adf44de45dce9eeb6d.
`GroupingExprRef` is not needed when it would refer to a simple `Attribute`.
The purpose of `GroupingExprRef`s to wrap expressions that could change during
optimization, I don't think that a simple attribute could change.
Also, `GroupingExprRef` is not a `NamedExpression` now. If we wanted to
allow `GroupingExprRef` to refer to a simple attribute in aggregate expressions
then either it should implement `NamedExpression` or it should be wrapped into
a new `Alias`.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##########
@@ -795,7 +795,7 @@ object NullPropagation extends Rule[LogicalPlan] {
*/
object FoldablePropagation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
- CleanupAliases(propagateFoldables(plan)._1)
+
EnforceGroupingReferencesInAggregates(CleanupAliases(propagateFoldables(plan)._1))
Review comment:
Fixed in
https://github.com/apache/spark/pull/31913/commits/06224449062e2f1e97a419adf44de45dce9eeb6d.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]