Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9038#discussion_r42071456
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala ---
@@ -154,143 +146,150 @@ object Utils {
val aggregateExpressions = functionsWithDistinct ++
functionsWithoutDistinct
val usesTungstenAggregate =
child.sqlContext.conf.unsafeEnabled &&
- aggregateExpressions.forall(
- _.aggregateFunction.isInstanceOf[DeclarativeAggregate]) &&
- supportsTungstenAggregate(
+ TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
- // 1. Create an Aggregate Operator for partial aggregations.
- val groupingAttributes = groupingExpressions.map(_.toAttribute)
-
- // It is safe to call head at here since functionsWithDistinct has at
least one
- // AggregateExpression2.
- val distinctColumnExpressions =
- functionsWithDistinct.head.aggregateFunction.children
- val namedDistinctColumnExpressions = distinctColumnExpressions.map {
- case ne: NamedExpression => ne -> ne
- case other =>
- val withAlias = Alias(other, other.toString)()
- other -> withAlias
+ // functionsWithDistinct is guaranteed to be non-empty. Even though it
may contain more than one
+ // DISTINCT aggregate function, all of those functions will have the
same column expression.
+ // For example, it would be valid for functionsWithDistinct to be
+ // [COUNT(DISTINCT foo), MAX(DISTINCT foo)], but [COUNT(DISTINCT bar),
COUNT(DISTINCT foo)] is
+ // disallowed because those two distinct aggregates have different
column expressions.
+ val distinctColumnExpression: Expression = {
+ val allDistinctColumnExpressions =
functionsWithDistinct.head.aggregateFunction.children
+ assert(allDistinctColumnExpressions.length == 1)
+ allDistinctColumnExpressions.head
+ }
+ val namedDistinctColumnExpression: NamedExpression =
distinctColumnExpression match {
+ case ne: NamedExpression => ne
+ case other => Alias(other, other.toString)()
}
- val distinctColumnExpressionMap = namedDistinctColumnExpressions.toMap
- val distinctColumnAttributes =
namedDistinctColumnExpressions.map(_._2.toAttribute)
+ val distinctColumnAttribute: Attribute =
namedDistinctColumnExpression.toAttribute
+ val groupingAttributes = groupingExpressions.map(_.toAttribute)
- val partialAggregateExpressions =
functionsWithoutDistinct.map(_.copy(mode = Partial))
- val partialAggregateAttributes =
-
partialAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
- val partialAggregateGroupingExpressions =
- groupingExpressions ++ namedDistinctColumnExpressions.map(_._2)
- val partialAggregateResult =
+ // 1. Create an Aggregate Operator for partial aggregations.
+ val partialAggregate: SparkPlan = {
+ val partialAggregateExpressions =
functionsWithoutDistinct.map(_.copy(mode = Partial))
+ val partialAggregateAttributes =
+
partialAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
+ // We will group by the original grouping expression, plus an
additional expression for the
+ // DISTINCT column. For example, for AVG(DISTINCT value) GROUP BY
key, the the grouping
+ // expressions will be [value, key].
--- End diff --
seems it should be `[key, value]`. Will fix it while merging it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]