maropu commented on a change in pull request #30144:
URL: https://github.com/apache/spark/pull/30144#discussion_r606314039
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -632,9 +632,10 @@ class Analyzer(override val catalogManager: CatalogManager)
if (resolvedInfo.nonEmpty) {
val (extraAggExprs, resolvedHavingCond) = resolvedInfo.get
val newChild = h.child match {
- case Aggregate(Seq(gs: GroupingSet), aggregateExpressions, child) =>
+ case Aggregate(
+ GroupingAnalytics(selectedGroupByExprs, _, groupByExprs),
aggregateExpressions, child) =>
Review comment:
nit:
```
case Aggregate(GroupingAnalytics(selectedGroupByExprs, _,
groupByExprs),
aggregateExpressions, child) =>
constructAggregate(
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -658,16 +659,18 @@ class Analyzer(override val catalogManager:
CatalogManager)
// CUBE/ROLLUP/GROUPING SETS. This also replace grouping()/grouping_id()
in resolved
// Filter/Sort.
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
- case h @ UnresolvedHaving(_, agg @ Aggregate(Seq(gs: GroupingSet),
aggregateExpressions, _))
- if agg.childrenResolved && (gs.groupByExprs ++
aggregateExpressions).forall(_.resolved) =>
+ case h @ UnresolvedHaving(_, agg @ Aggregate(
+ GroupingAnalytics(_, _, groupByExprs), aggregateExpressions, _))
+ if agg.childrenResolved && (groupByExprs ++
aggregateExpressions).forall(_.resolved) =>
tryResolveHavingCondition(h)
case a if !a.childrenResolved => a // be sure all of the children are
resolved.
// Ensure group by expressions and aggregate expressions have been
resolved.
- case Aggregate(Seq(gs: GroupingSet), aggregateExpressions, child)
- if (gs.groupByExprs ++ aggregateExpressions).forall(_.resolved) =>
- constructAggregate(gs.selectedGroupByExprs, gs.groupByExprs,
aggregateExpressions, child)
+ case Aggregate(
+ GroupingAnalytics(selectedGroupByExprs, _, groupByExprs),
aggregateExpressions, child)
Review comment:
nit: ` case Aggregate(GroupingAnalytics(selectedGroupByExprs, _,
groupByExprs), aggExprs, child)`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
##########
@@ -212,3 +212,25 @@ object GroupingID {
if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType
}
}
+
+
+object GroupingAnalytics {
+ def unapply(exprs: Seq[Expression])
+ : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = {
+ val (groupingSetExprs, others) =
exprs.partition(_.isInstanceOf[GroupingSet])
+ if (groupingSetExprs.isEmpty) {
+ None
+ } else {
+ val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet])
+ val groups = groupingSets.flatMap(_.groupByExprs) ++ others
+ val unMergedSelectedGroupByExprs =
groupingSets.map(_.selectedGroupByExprs)
Review comment:
nit: `unMerged` -> `unmerged`?
##########
File path: docs/sql-ref-syntax-qry-select-groupby.md
##########
@@ -23,8 +23,9 @@ license: |
The `GROUP BY` clause is used to group the rows based on a set of specified
grouping expressions and compute aggregations on
the group of rows based on one or more specified aggregate functions. Spark
also supports advanced aggregations to do multiple
-aggregations for the same input record set via `GROUPING SETS`, `CUBE`,
`ROLLUP` clauses.
-When a FILTER clause is attached to an aggregate function, only the matching
rows are passed to that function.
+aggregations for the same input record set via `GROUPING SETS`, `CUBE`,
`ROLLUP` clauses, also spark support partial grouping
Review comment:
```
the group of rows based on one or more specified aggregate functions. Spark
also supports advanced aggregations to do multiple
aggregations for the same input record set via `GROUPING SETS`, `CUBE`,
`ROLLUP` clauses.
These grouping analytics clauses can be specified with regular grouping
expressions (partial grouping analytics) and
the different grouping analytics clauses can be specified together
(concatenated groupings).
When a FILTER clause is attached to an aggregate function, only the matching
```
?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
##########
@@ -212,3 +212,25 @@ object GroupingID {
if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType
}
}
+
+
Review comment:
nit: remove this blank line.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -658,16 +659,18 @@ class Analyzer(override val catalogManager:
CatalogManager)
// CUBE/ROLLUP/GROUPING SETS. This also replace grouping()/grouping_id()
in resolved
// Filter/Sort.
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
- case h @ UnresolvedHaving(_, agg @ Aggregate(Seq(gs: GroupingSet),
aggregateExpressions, _))
- if agg.childrenResolved && (gs.groupByExprs ++
aggregateExpressions).forall(_.resolved) =>
+ case h @ UnresolvedHaving(_, agg @ Aggregate(
+ GroupingAnalytics(_, _, groupByExprs), aggregateExpressions, _))
+ if agg.childrenResolved && (groupByExprs ++
aggregateExpressions).forall(_.resolved) =>
Review comment:
nit:
```
GroupingAnalytics(_, _, groupByExprs), aggregateExpressions, _))
if agg.childrenResolved && (groupByExprs ++
aggregateExpressions).forall(_.resolved) =>
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
##########
@@ -212,3 +212,25 @@ object GroupingID {
if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType
}
}
+
+
+object GroupingAnalytics {
+ def unapply(exprs: Seq[Expression])
+ : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = {
+ val (groupingSetExprs, others) =
exprs.partition(_.isInstanceOf[GroupingSet])
+ if (groupingSetExprs.isEmpty) {
+ None
+ } else {
+ val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet])
+ val groups = groupingSets.flatMap(_.groupByExprs) ++ others
+ val unMergedSelectedGroupByExprs =
groupingSets.map(_.selectedGroupByExprs)
+ val selectedGroupByExprs = unMergedSelectedGroupByExprs.tail
+ .foldRight(unMergedSelectedGroupByExprs.head) { (x, y) =>
+ for (a <- x; b <- y) yield b ++ a
+ }.map { groupByExprs =>
+ (others ++ groupByExprs).distinct
+ }
Review comment:
nit: indents
```
}.map { groupByExprs =>
(others ++ groupByExprs).distinct
}
```
##########
File path: sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out
##########
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 37
+-- Number of queries: 45
Review comment:
NOTE: I've checked the the output result are the same with the
PostgreSQL ones.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
##########
@@ -212,3 +212,25 @@ object GroupingID {
if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType
}
}
+
+
+object GroupingAnalytics {
+ def unapply(exprs: Seq[Expression])
+ : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = {
+ val (groupingSetExprs, others) =
exprs.partition(_.isInstanceOf[GroupingSet])
+ if (groupingSetExprs.isEmpty) {
+ None
+ } else {
+ val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet])
+ val groups = groupingSets.flatMap(_.groupByExprs) ++ others
+ val unMergedSelectedGroupByExprs =
groupingSets.map(_.selectedGroupByExprs)
+ val selectedGroupByExprs = unMergedSelectedGroupByExprs.tail
+ .foldRight(unMergedSelectedGroupByExprs.head) { (x, y) =>
Review comment:
nit: `foldRight` -> `foldLeft`? (I think most code use `foldLeft` if
both `foldLeft`/`foldRight` can work)
##########
File path: sql/core/src/test/resources/sql-tests/inputs/group-analytics.sql
##########
@@ -59,4 +59,12 @@ SELECT course, year FROM courseSales GROUP BY CUBE(course,
year) ORDER BY groupi
-- Aliases in SELECT could be used in ROLLUP/CUBE/GROUPING SETS
SELECT a + b AS k1, b AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2);
SELECT a + b AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b);
-SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING
SETS(k)
+SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING
SETS(k);
+
+-- GROUP BY use mixed Separate columns and CUBE/ROLLUP
+SELECT a, b, count(1) FROM testData GROUP BY a, b, CUBE(a, b);
+SELECT a, b, count(1) FROM testData GROUP BY a, b, ROLLUP(a, b);
+SELECT a, b, count(1) FROM testData GROUP BY CUBE(a, b), ROLLUP(a, b);
+SELECT a, b, count(1) FROM testData GROUP BY a, CUBE(a, b), ROLLUP(b);
+SELECT a, b, count(1) FROM testData GROUP BY CUBE(a, b), ROLLUP(a, b) GROUPING
SETS(a, b);
Review comment:
It looks an invalid query.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
##########
@@ -212,3 +212,25 @@ object GroupingID {
if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType
}
}
+
+
+object GroupingAnalytics {
+ def unapply(exprs: Seq[Expression])
+ : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = {
+ val (groupingSetExprs, others) =
exprs.partition(_.isInstanceOf[GroupingSet])
+ if (groupingSetExprs.isEmpty) {
+ None
+ } else {
+ val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet])
+ val groups = groupingSets.flatMap(_.groupByExprs) ++ others
+ val unMergedSelectedGroupByExprs =
groupingSets.map(_.selectedGroupByExprs)
+ val selectedGroupByExprs = unMergedSelectedGroupByExprs.tail
+ .foldRight(unMergedSelectedGroupByExprs.head) { (x, y) =>
+ for (a <- x; b <- y) yield b ++ a
Review comment:
nit: `b ++ a` -> `a ++ b` for a natural order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]