Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/10228#discussion_r47439996
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
---
@@ -33,64 +33,46 @@ import scala.collection.mutable.ArrayBuffer
* is used to generate result.
*/
abstract class AggregationIterator(
- groupingKeyAttributes: Seq[Attribute],
- valueAttributes: Seq[Attribute],
- nonCompleteAggregateExpressions: Seq[AggregateExpression],
- nonCompleteAggregateAttributes: Seq[Attribute],
- completeAggregateExpressions: Seq[AggregateExpression],
- completeAggregateAttributes: Seq[Attribute],
+ groupingExpressions: Seq[NamedExpression],
+ inputAttributes: Seq[Attribute],
+ aggregateExpressions: Seq[AggregateExpression],
+ aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
- newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() =>
MutableProjection),
- outputsUnsafeRows: Boolean)
- extends Iterator[InternalRow] with Logging {
+ newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() =>
MutableProjection))
+ extends Iterator[UnsafeRow] with Logging {
///////////////////////////////////////////////////////////////////////////
// Initializing functions.
///////////////////////////////////////////////////////////////////////////
- // An Seq of all AggregateExpressions.
- // It is important that all AggregateExpressions with the mode Partial,
PartialMerge or Final
- // are at the beginning of the allAggregateExpressions.
- protected val allAggregateExpressions =
- nonCompleteAggregateExpressions ++ completeAggregateExpressions
-
- require(
- allAggregateExpressions.map(_.mode).distinct.length <= 2,
- s"$allAggregateExpressions are not supported becuase they have more
than 2 distinct modes.")
-
- /**
- * The distinct modes of AggregateExpressions. Right now, we can handle
the following mode:
- * - Partial-only: all AggregateExpressions have the mode of Partial;
- * - PartialMerge-only: all AggregateExpressions have the mode of
PartialMerge);
- * - Final-only: all AggregateExpressions have the mode of Final;
- * - Final-Complete: some AggregateExpressions have the mode of Final
and
- * others have the mode of Complete;
- * - Complete-only: nonCompleteAggregateExpressions is empty and we
have AggregateExpressions
- * with mode Complete in completeAggregateExpressions; and
- * - Grouping-only: there is no AggregateExpression.
- */
- protected val aggregationMode: (Option[AggregateMode],
Option[AggregateMode]) =
- nonCompleteAggregateExpressions.map(_.mode).distinct.headOption ->
- completeAggregateExpressions.map(_.mode).distinct.headOption
+ {
+ val modes = aggregateExpressions.map(_.mode).distinct.toSet
+ require(modes.size <= 2,
+ s"$aggregateExpressions are not supported because they have more
than 2 distinct modes.")
+ require(modes.subsetOf(Set(Partial, PartialMerge)) ||
modes.subsetOf(Set(Final, Complete)),
+ s"$aggregateExpressions can't have Partial/PartialMerge and
Final/Complete in the same time.")
+ }
// Initialize all AggregateFunctions by binding references if necessary,
// and set inputBufferOffset and mutableBufferOffset.
- protected val allAggregateFunctions: Array[AggregateFunction] = {
+ protected def initializeAggregateFunctions(
+ expressions: Seq[AggregateExpression],
+ startingInputBufferOffset: Int): Array[AggregateFunction] = {
--- End diff --
format
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]