Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/15484#discussion_r84583794
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -255,98 +265,125 @@ class Analyzer(
expr transform {
case e: GroupingID =>
if (e.groupByExprs.isEmpty || e.groupByExprs == groupByExprs) {
- gid
+ Alias(gid, toPrettySQL(e))()
} else {
throw new AnalysisException(
s"Columns of grouping_id (${e.groupByExprs.mkString(",")})
does not match " +
s"grouping columns (${groupByExprs.mkString(",")})")
}
- case Grouping(col: Expression) =>
+ case e @ Grouping(col: Expression) =>
val idx = groupByExprs.indexOf(col)
if (idx >= 0) {
- Cast(BitwiseAnd(ShiftRight(gid, Literal(groupByExprs.length -
1 - idx)),
- Literal(1)), ByteType)
+ Alias(Cast(BitwiseAnd(ShiftRight(gid,
Literal(groupByExprs.length - 1 - idx)),
+ Literal(1)), ByteType), toPrettySQL(e))()
} else {
throw new AnalysisException(s"Column of grouping ($col) can't
be found " +
s"in grouping columns ${groupByExprs.mkString(",")}")
}
}
}
- // This require transformUp to replace grouping()/grouping_id() in
resolved Filter/Sort
- def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case a if !a.childrenResolved => a // be sure all of the children
are resolved.
- case p if p.expressions.exists(hasGroupingAttribute) =>
- failAnalysis(
- s"${VirtualColumn.hiveGroupingIdName} is deprecated; use
grouping_id() instead")
-
- case Aggregate(Seq(c @ Cube(groupByExprs)), aggregateExpressions,
child) =>
- GroupingSets(bitmasks(c), groupByExprs, child,
aggregateExpressions)
- case Aggregate(Seq(r @ Rollup(groupByExprs)), aggregateExpressions,
child) =>
- GroupingSets(bitmasks(r), groupByExprs, child,
aggregateExpressions)
+ /*
+ * Create new alias for all group by expressions for `Expand` operator.
+ */
+ private def constructGroupByAlias(groupByExprs: Seq[Expression]):
Seq[Alias] = {
+ groupByExprs.map {
+ case e: NamedExpression => Alias(e, e.name)()
+ case other => Alias(other, other.toString)()
+ }
+ }
- // Ensure all the expressions have been resolved.
- case x: GroupingSets if x.expressions.forall(_.resolved) =>
- val gid = AttributeReference(VirtualColumn.groupingIdName,
IntegerType, false)()
-
- // Expand works by setting grouping expressions to null as
determined by the bitmasks. To
- // prevent these null values from being used in an aggregate
instead of the original value
- // we need to create new aliases for all group by expressions that
will only be used for
- // the intended purpose.
- val groupByAliases: Seq[Alias] = x.groupByExprs.map {
- case e: NamedExpression => Alias(e, e.name)()
- case other => Alias(other, other.toString)()
+ /*
+ * Construct [[Expand]] operator with grouping sets.
+ */
+ private def constructExpand(
+ selectedGroupByExprs: Seq[Seq[Expression]],
+ child: LogicalPlan,
+ groupByAliases: Seq[Alias],
+ gid: Attribute): LogicalPlan = {
+ // Change the nullability of group by aliases if necessary. For
example, if we have
+ // GROUPING SETS ((a,b), a), we do not need to change the
nullability of a, but we
+ // should change the nullabilty of b to be TRUE.
+ // TODO: For Cube/Rollup just set nullability to be `true`.
+ val expandedAttributes = groupByAliases.zipWithIndex.map { case (a,
idx) =>
--- End diff --
idx is not used?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]