cloud-fan commented on a change in pull request #28501:
URL: https://github.com/apache/spark/pull/28501#discussion_r425125012
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -595,8 +601,73 @@ class Analyzer(
}
}
- // This require transformUp to replace grouping()/grouping_id() in
resolved Filter/Sort
- def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+ private def tryResolveHavingCondition(
+ a: UnresolvedHaving, havingCond: Expression, agg: LogicalPlan):
LogicalPlan = {
+ val aggForResolving = agg match {
+ // For CUBE/ROLLUP expressions, to avoid resolving repeatedly, here we
delete them from
+ // groupingExpressions for condition resolving.
+ case a @ Aggregate(Seq(c @ Cube(groupByExprs)), _, _) =>
+ a.copy(groupingExpressions = groupByExprs)
+ case a @ Aggregate(Seq(r @ Rollup(groupByExprs)), _, _) =>
+ a.copy(groupingExpressions = groupByExprs)
+ case g: GroupingSets =>
+ Aggregate(
+ getFinalGroupByExpressions(g.selectedGroupByExprs, g.groupByExprs),
+ g.aggregations, g.child)
+ }
+ // Try resolving the condition of the filter as though it is in the
aggregate clause
+ val (extraAggExprs, resolvedHavingCond) =
+ ResolveAggregateFunctions.resolveFilterCondInAggregate(havingCond,
aggForResolving)
+
+ // Push the aggregate expressions into the aggregate (if any).
+ if (extraAggExprs.nonEmpty) {
+ val newChild = agg match {
+ case Aggregate(Seq(c @ Cube(groupByExprs)), aggregateExpressions,
child) =>
+ constructAggregate(
+ cubeExprs(groupByExprs), groupByExprs, aggregateExpressions ++
extraAggExprs, child)
+ case Aggregate(Seq(r @ Rollup(groupByExprs)), aggregateExpressions,
child) =>
+ constructAggregate(
+ rollupExprs(groupByExprs), groupByExprs, aggregateExpressions ++
extraAggExprs, child)
+ case x: GroupingSets =>
+ constructAggregate(
+ x.selectedGroupByExprs, x.groupByExprs, x.aggregations ++
extraAggExprs, x.child)
+ }
+
+ // Since the exprId of extraAggExprs will be changed in the
constructed aggregate, and the
+ // aggregateExpressions keeps the input order. So here we build an
exprMap to resolve the
+ // condition again.
+ val exprMap = extraAggExprs.zip(
+ newChild.asInstanceOf[Aggregate].aggregateExpressions.takeRight(
+ extraAggExprs.length)).toMap
+ val newCond = resolvedHavingCond.get.transform {
+ case ne: NamedExpression if exprMap.contains(ne) => exprMap(ne)
+ }
+ Project(newChild.output.dropRight(extraAggExprs.length),
+ Filter(newCond, newChild))
+ } else {
+ a
+ }
+ }
+
+ // This require transformDown to resolve having condition when generating
aggregate node for
+ // CUBE/ROLLUP/GROUPING SETS. This also replace grouping()/grouping_id()
in resolved
+ // Filter/Sort.
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
+ case a @ UnresolvedHaving(
+ havingCondition, agg @ Aggregate(Seq(c @ Cube(groupByExprs)),
aggregateExpressions, _))
+ if agg.childrenResolved &&
!havingCondition.isInstanceOf[SubqueryExpression]
Review comment:
do we still need `!havingCondition.isInstanceOf[SubqueryExpression]`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]