dtenedor commented on code in PR #48529:
URL: https://github.com/apache/spark/pull/48529#discussion_r1821538473
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -5899,7 +5918,80 @@ class AstBuilder extends DataTypeAstBuilder
visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
}.getOrElse(Option(ctx.queryOrganization).map { c =>
withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
- }.get)))))))
+ }.getOrElse(
+ visitOperatorPipeAggregate(ctx, left)
+ ))))))))
+ }
+
+ private def visitOperatorPipeAggregate(
+ ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
+ assert(ctx.AGGREGATE != null)
+ if (ctx.namedExpressionSeq() == null && ctx.aggregationClause() == null) {
+ operationNotAllowed(
+ "The AGGREGATE clause requires a list of aggregate expressions " +
+ "or a list of grouping expressions, or both", ctx)
+ }
+ val aggregateExpressions: Seq[NamedExpression] =
+ Option(ctx.namedExpressionSeq()).map { n: NamedExpressionSeqContext =>
+ visitNamedExpressionSeq(n).map {
+ case (e: NamedExpression, _) => e
+ case (e: Expression, aliasFunc) => UnresolvedAlias(e, aliasFunc)
+ }
+ }.getOrElse(Seq.empty)
+ Option(ctx.aggregationClause()).map { c: AggregationClauseContext =>
+ withAggregationClause(c, aggregateExpressions, left,
allowNamedGroupingExpressions = true)
+ match {
+ case a: Aggregate =>
+ // GROUP BY ALL, GROUP BY CUBE, GROUPING_ID, GROUPING SETS, and
GROUP BY ROLLUP are not
+ // supported yet.
+ def error(s: String): Unit =
+ throw
QueryParsingErrors.pipeOperatorAggregateUnsupportedCaseError(s, c)
+ a.groupingExpressions match {
+ case Seq(key: UnresolvedAttribute) if key.equalsIgnoreCase("ALL")
=>
+ error("GROUP BY ALL")
+ case _ =>
+ }
+ def visit(e: Expression): Unit = {
+ e match {
+ case _: Cube => error("GROUP BY CUBE")
+ case _: GroupingSets => error("GROUPING SETS")
+ case _: Rollup => error("GROUP BY ROLLUP")
+ case f: UnresolvedFunction if f.arguments.length == 1 &&
f.nameParts.length == 1 =>
+ Seq("GROUPING", "GROUPING_ID").foreach { name =>
+ if (f.nameParts.head.equalsIgnoreCase(name)) error(name)
+ }
+ case _: WindowSpec => error("window functions")
+ case _ =>
+ }
+ e.children.foreach(visit)
+ }
+ a.groupingExpressions.foreach(visit)
+ a.aggregateExpressions.foreach(visit)
+ // Non-aggregate expressions are not allowed in place of aggregate
functions, even if they
+ // appear separately in the GROUP BY clause.
+ val groupingExpressionSet = a.groupingExpressions.toSet
+ a.aggregateExpressions.foreach { e: Expression =>
+ if (groupingExpressionSet.contains(e)) {
Review Comment:
No problem. I just removed the restriction entirely for now, we don't
necessarily have to include it here. It was present in ZetaSQL [1]. I added a
couple of test cases exercising this.
[1]
https://github.com/google/zetasql/blob/194cd32b5d766d60e3ca442651d792c7fe54ea74/zetasql/analyzer/testdata/pipe_aggregate.test#L724C8-L724C12
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]