Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/3247#discussion_r27717430
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -17,181 +17,461 @@
package org.apache.spark.sql.execution
-import java.util.HashMap
-
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
+
+import org.apache.spark.util.collection.{OpenHashSet, OpenHashMap}
+
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.SQLContext
/**
- * :: DeveloperApi ::
- * Groups input data by `groupingExpressions` and computes the
`aggregateExpressions` for each
- * group.
+ * An aggregate that needs to be computed for each row in a group.
*
- * @param partial if true then aggregation is done partially on local data
without shuffling to
- * ensure all values where `groupingExpressions` are equal
are present.
- * @param groupingExpressions expressions that are evaluated to determine
grouping.
- * @param aggregateExpressions expressions that are computed for each
group.
- * @param child the input data source.
+ * @param aggregate AggregateExpression, associated with the function
+ * @param substitution A MutableLiteral used to refer to the result of
this aggregate in the final
+ * output.
*/
-@DeveloperApi
-case class Aggregate(
- partial: Boolean,
- groupingExpressions: Seq[Expression],
- aggregateExpressions: Seq[NamedExpression],
- child: SparkPlan)
- extends UnaryNode {
-
- override def requiredChildDistribution: List[Distribution] = {
- if (partial) {
- UnspecifiedDistribution :: Nil
- } else {
- if (groupingExpressions == Nil) {
- AllTuples :: Nil
- } else {
- ClusteredDistribution(groupingExpressions) :: Nil
- }
- }
+sealed case class AggregateFunctionBind(
+ aggregate: AggregateExpression,
+ substitution: MutableLiteral)
+
+sealed class InputBufferSeens(
+ var input: Row, //
+ var buffer: MutableRow,
+ var seens: Array[OpenHashSet[Any]] = null) {
+ def this() {
+ this(new GenericMutableRow(0), null)
}
- override def output: Seq[Attribute] =
aggregateExpressions.map(_.toAttribute)
+ def withInput(row: Row): InputBufferSeens = {
+ this.input = row
+ this
+ }
- /**
- * An aggregate that needs to be computed for each row in a group.
- *
- * @param unbound Unbound version of this aggregate, used for result
substitution.
- * @param aggregate A bound copy of this aggregate used to create a new
aggregation buffer.
- * @param resultAttribute An attribute used to refer to the result of
this aggregate in the final
- * output.
- */
- case class ComputedAggregate(
- unbound: AggregateExpression,
- aggregate: AggregateExpression,
- resultAttribute: AttributeReference)
-
- /** A list of aggregates that need to be computed for each group. */
- private[this] val computedAggregates = aggregateExpressions.flatMap {
agg =>
- agg.collect {
- case a: AggregateExpression =>
- ComputedAggregate(
- a,
- BindReferences.bindReference(a, child.output),
- AttributeReference(s"aggResult:$a", a.dataType, a.nullable)())
+ def withBuffer(row: MutableRow): InputBufferSeens = {
+ this.buffer = row
+ this
+ }
+
+ def withSeens(seens: Array[OpenHashSet[Any]]): InputBufferSeens = {
+ this.seens = seens
+ this
+ }
+}
+
+sealed trait Aggregate {
+ self: Product =>
+ // HACK: Generators don't correctly preserve their output through
serializations so we grab
+ // out child's output attributes statically here.
+ val childOutput = child.output
+ val isGlobalAggregation = groupingExpressions.isEmpty
+
+ def computedAggregates: Array[AggregateExpression] = {
+ boundProjection.flatMap { expr =>
+ expr.collect {
+ case ae: AggregateExpression => ae
+ }
}
}.toArray
- /** The schema of the result of all aggregate evaluations */
- private[this] val computedSchema =
computedAggregates.map(_.resultAttribute)
+ // This is a hack, instead of relying on the BindReferences for the
aggregation
+ // buffer schema in PostShuffle, we have a strong protocols which
represented as the
+ // BoundReferences in PostShuffle for aggregation buffer.
+ @transient lazy val bufferSchema: Array[AttributeReference] =
+ computedAggregates.zipWithIndex.flatMap { case (ca, idx) =>
+ ca.bufferDataType.zipWithIndex.map { case (dt, i) =>
+ AttributeReference(s"aggr.${idx}_$i", dt)() }
+ }.toArray
- /** Creates a new aggregate buffer for a group. */
- private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
- val buffer = new Array[AggregateFunction](computedAggregates.length)
- var i = 0
- while (i < computedAggregates.length) {
- buffer(i) = computedAggregates(i).aggregate.newInstance()
- i += 1
+ // The tuples of aggregate expressions with information
+ // (AggregateExpression, Aggregate Function, Placeholder of
AggregateExpression result)
+ @transient lazy val aggregateFunctionBinds: Array[AggregateFunctionBind]
= {
+ var pos = 0
+ computedAggregates.map { ae =>
+ ae.initial(mode)
+
+ // we connect all of the aggregation buffers in a single Row,
+ // and "BIND" the attribute references in a Hack way.
+ val bufferDataTypes = ae.bufferDataType
+ ae.initialBoundReference(for (i <- 0 until bufferDataTypes.length)
yield {
+ BoundReference(pos + i, bufferDataTypes(i), true)
+ })
+ pos += bufferDataTypes.length
+
+ AggregateFunctionBind(ae, MutableLiteral(null, ae.dataType))
}
- buffer
}
- /** Named attributes used to substitute grouping attributes into the
final result. */
- private[this] val namedGroups = groupingExpressions.map {
- case ne: NamedExpression => ne -> ne.toAttribute
- case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
+ @transient lazy val groupByProjection = if (groupingExpressions.isEmpty)
{
+ InterpretedMutableProjection(Nil)
+ } else {
+ new InterpretedMutableProjection(groupingExpressions, childOutput)
+ }
+
+ // Indicate which stage we are running into
+ def mode: Mode
+ // This is provided by SparkPlan
+ def child: SparkPlan
+ // Group By Key Expressions
+ def groupingExpressions: Seq[Expression]
+ // Bounded Projection
+ def boundProjection: Seq[NamedExpression]
+}
+
+sealed trait PreShuffle extends Aggregate {
+ self: Product =>
+
+ def boundProjection: Seq[NamedExpression] = projection.map {
+ case a: Attribute => // Attribute will be converted into BoundReference
+ Alias(
+ BindReferences.bindReference(a: Expression, childOutput),
a.name)(a.exprId, a.qualifiers)
+ case a: NamedExpression => BindReferences.bindReference(a, childOutput)
}
+ // The expression list for output, this is the unbound expressions
+ def projection: Seq[NamedExpression]
+}
+
+sealed trait PostShuffle extends Aggregate {
+ self: Product =>
/**
- * A map of substitutions that are used to insert the aggregate
expressions and grouping
- * expression into the final result expression.
+ * Substituted version of boundProjection expressions which are used to
compute final
+ * output rows given a group and the result of all aggregate
computations.
*/
- private[this] val resultMap =
- (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute }
++ namedGroups).toMap
+ @transient lazy val finalExpressions = {
+ val resultMap = aggregateFunctionBinds.map { ae => ae.aggregate ->
ae.substitution }.toMap
+ boundProjection.map { agg =>
+ agg.transform {
+ case e: AggregateExpression if resultMap.contains(e) =>
resultMap(e)
+ }
+ }
+ }.map(e => {BindReferences.bindReference(e: Expression, childOutput)})
+
+ @transient lazy val finalProjection = new
InterpretedMutableProjection(finalExpressions)
+
+ def aggregateFunctionBinds: Array[AggregateFunctionBind]
+
+ def createIterator(
+ aggregates: Array[AggregateExpression],
+ iterator: Iterator[InputBufferSeens]) = {
+ val substitutions = aggregateFunctionBinds.map(_.substitution)
+
+ new Iterator[Row] {
+ override final def hasNext: Boolean = iterator.hasNext
+
+ override final def next(): Row = {
+ val keybuffer = iterator.next()
+
+ var idx = 0
+ while (idx < aggregates.length) {
+ // substitute the AggregateExpression value
+ substitutions(idx).value =
aggregates(idx).terminate(keybuffer.buffer)
+ idx += 1
+ }
+
+ finalProjection(keybuffer.input)
+ }
+ }
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * Groups input data by `groupingExpressions` and computes the
`projection` for each
+ * group.
+ *
+ * @param groupingExpressions expressions that are evaluated to determine
grouping.
+ * @param projection expressions that are computed for each group.
+ * @param namedGroupingAttributes the attributes represent the output of
the groupby expressions
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class AggregatePreShuffle(
+ groupingExpressions: Seq[Expression],
+ projection: Seq[NamedExpression],
+ namedGroupingAttributes: Seq[Attribute],
+ child: SparkPlan)
+ extends UnaryNode with PreShuffle {
+
+ override def requiredChildDistribution = UnspecifiedDistribution :: Nil
+
+ override def output = bufferSchema.map(_.toAttribute) ++
namedGroupingAttributes
+
+ override def mode: Mode = PARTIAL1 // iterate & terminalPartial will be
called
/**
- * Substituted version of aggregateExpressions expressions which are
used to compute final
- * output rows given a group and the result of all aggregate
computations.
+ * Create Iterator for the in-memory hash map.
*/
- private[this] val resultExpressions = aggregateExpressions.map { agg =>
- agg.transform {
- case e: Expression if resultMap.contains(e) => resultMap(e)
+ private[this] def createIterator(
+ functions: Array[AggregateExpression],
+ iterator: Iterator[InputBufferSeens]) = {
+ new Iterator[Row] {
+ private[this] val joinedRow = new JoinedRow
+
+ override final def hasNext: Boolean = iterator.hasNext
+
+ override final def next(): Row = {
+ val keybuffer = iterator.next()
+ var idx = 0
+ while (idx < functions.length) {
+ functions(idx).terminatePartial(keybuffer.buffer)
+ idx += 1
+ }
+
+ joinedRow(keybuffer.buffer, keybuffer.input).copy()
+ }
}
}
- override def execute(): RDD[Row] = attachTree(this, "execute") {
- if (groupingExpressions.isEmpty) {
- child.execute().mapPartitions { iter =>
- val buffer = newAggregateBuffer()
- var currentRow: Row = null
+ override def execute() = attachTree(this, "execute") {
+ child.execute().mapPartitions { iter =>
+ val aggregates = aggregateFunctionBinds.map(_.aggregate)
+
+ if (groupingExpressions.isEmpty) {
+ // without group by keys
+ val buffer = new GenericMutableRow(bufferSchema.length)
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ ae.reset(buffer)
+ idx += 1
+ }
+
while (iter.hasNext) {
- currentRow = iter.next()
- var i = 0
- while (i < buffer.length) {
- buffer(i).update(currentRow)
- i += 1
+ val currentRow = iter.next()
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ ae.iterate(ae.eval(currentRow), buffer)
+ idx += 1
}
}
- val resultProjection = new
InterpretedProjection(resultExpressions, computedSchema)
- val aggregateResults = new
GenericMutableRow(computedAggregates.length)
- var i = 0
- while (i < buffer.length) {
- aggregateResults(i) = buffer(i).eval(EmptyRow)
- i += 1
+ createIterator(aggregates, Iterator(new
InputBufferSeens().withBuffer(buffer)))
+ } else {
+ val results = new OpenHashMap[Row, InputBufferSeens]()
+ while (iter.hasNext) {
+ val currentRow = iter.next()
+
+ val keys = groupByProjection(currentRow)
+ results(keys) match {
+ case null =>
+ val buffer = new GenericMutableRow(bufferSchema.length)
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ val value = ae.eval(currentRow)
+ // TODO distinctLike? We need to store the "seen" for
+ // AggregationExpression that distinctLike=true
+ // This is a trade off between memory & computing
+ ae.reset(buffer)
+ ae.iterate(value, buffer)
+ idx += 1
+ }
+
+ val copies = keys.copy()
+ results(copies) = new InputBufferSeens(copies, buffer)
+ case inputbuffer =>
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ ae.iterate(ae.eval(currentRow), inputbuffer.buffer)
+ idx += 1
+ }
+
+ }
}
- Iterator(resultProjection(aggregateResults))
+ createIterator(aggregates, results.iterator.map(_._2))
}
- } else {
- child.execute().mapPartitions { iter =>
- val hashTable = new HashMap[Row, Array[AggregateFunction]]
- val groupingProjection = new
InterpretedMutableProjection(groupingExpressions, child.output)
+ }
+ }
+}
+
+case class AggregatePostShuffle(
+ groupingExpressions: Seq[Expression],
+ boundProjection: Seq[NamedExpression],
+ child: SparkPlan) extends UnaryNode with PostShuffle {
+
+ override def output = boundProjection.map(_.toAttribute)
+
+ override def requiredChildDistribution: Seq[Distribution] = if
(groupingExpressions == Nil) {
+ AllTuples :: Nil
+ } else {
+ ClusteredDistribution(groupingExpressions) :: Nil
+ }
+
+ override def mode: Mode = FINAL // merge & terminate will be called
+
+ override def execute() = attachTree(this, "execute") {
+ child.execute().mapPartitions { iter =>
+ val aggregates = aggregateFunctionBinds.map(_.aggregate)
+ if (groupingExpressions.isEmpty) {
+ val buffer = new GenericMutableRow(bufferSchema.length)
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ ae.reset(buffer)
+ idx += 1
+ }
+
+ while (iter.hasNext) {
+ val currentRow = iter.next()
+
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ ae.merge(currentRow, buffer)
+ idx += 1
+ }
+ }
- var currentRow: Row = null
+ createIterator(aggregates, Iterator(new
InputBufferSeens().withBuffer(buffer)))
+ } else {
+ val results = new OpenHashMap[Row, InputBufferSeens]()
while (iter.hasNext) {
- currentRow = iter.next()
- val currentGroup = groupingProjection(currentRow)
- var currentBuffer = hashTable.get(currentGroup)
- if (currentBuffer == null) {
- currentBuffer = newAggregateBuffer()
- hashTable.put(currentGroup.copy(), currentBuffer)
+ val currentRow = iter.next()
+ val keys = groupByProjection(currentRow)
+ results(keys) match {
+ case null =>
+ val buffer = new GenericMutableRow(bufferSchema.length)
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ ae.reset(buffer)
+ ae.merge(currentRow, buffer)
+ idx += 1
+ }
+ results(keys.copy()) = new
InputBufferSeens(currentRow.copy(), buffer)
+ case pair =>
+ var idx = 0
+ while (idx < aggregates.length) {
+ val ae = aggregates(idx)
+ ae.merge(currentRow, pair.buffer)
+ idx += 1
+ }
}
+ }
+
+ createIterator(aggregates, results.iterator.map(_._2))
+ }
+ }
+ }
+}
+
+// TODO Currently even if only a single DISTINCT exists in the aggregate
expressions, we will
+// not do partial aggregation (aggregating before shuffling), all of the
data have to be shuffled
+// to the reduce side and do aggregation directly, this probably causes
the performance regression
+// for Aggregation Function like CountDistinct etc.
+case class DistinctAggregate(
+ groupingExpressions: Seq[Expression],
+ projection: Seq[NamedExpression],
+ child: SparkPlan) extends UnaryNode with PreShuffle with PostShuffle {
+ override def output = boundProjection.map(_.toAttribute)
+
+ override def requiredChildDistribution: Seq[Distribution] = if
(groupingExpressions == Nil) {
+ AllTuples :: Nil
+ } else {
+ ClusteredDistribution(groupingExpressions) :: Nil
+ }
+
+ override def mode: Mode = COMPLETE // iterate() & terminate() will be
called
- var i = 0
- while (i < currentBuffer.length) {
- currentBuffer(i).update(currentRow)
- i += 1
+ override def execute() = attachTree(this, "execute") {
+ child.execute().mapPartitions { iter =>
+ val aggregates = aggregateFunctionBinds.map(_.aggregate)
+ if (groupingExpressions.isEmpty) {
+ val buffer = new GenericMutableRow(bufferSchema.length)
+ // TODO save the memory only for those DISTINCT aggregate
expressions
+ val seens = new
Array[OpenHashSet[Any]](aggregateFunctionBinds.length)
+
+ var idx = 0
+ while (idx < aggregateFunctionBinds.length) {
+ val ae = aggregates(idx)
+ ae.reset(buffer)
+
+ if (ae.distinct) {
+ seens(idx) = new OpenHashSet[Any]()
}
+
+ idx += 1
}
+ val ibs = new
InputBufferSeens().withBuffer(buffer).withSeens(seens)
- new Iterator[Row] {
- private[this] val hashTableIter = hashTable.entrySet().iterator()
- private[this] val aggregateResults = new
GenericMutableRow(computedAggregates.length)
- private[this] val resultProjection =
- new InterpretedMutableProjection(
- resultExpressions, computedSchema ++ namedGroups.map(_._2))
- private[this] val joinedRow = new JoinedRow4
-
- override final def hasNext: Boolean = hashTableIter.hasNext
-
- override final def next(): Row = {
- val currentEntry = hashTableIter.next()
- val currentGroup = currentEntry.getKey
- val currentBuffer = currentEntry.getValue
-
- var i = 0
- while (i < currentBuffer.length) {
- // Evaluating an aggregate buffer returns the result. No
row is required since we
- // already added all rows in the group using update.
- aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
- i += 1
+ while (iter.hasNext) {
+ val currentRow = iter.next()
+
+ var idx = 0
+ while (idx < aggregateFunctionBinds.length) {
+ val ae = aggregates(idx)
+ val value = ae.eval(currentRow)
+
+ if (ae.distinct) {
+ if (value != null && !seens(idx).contains(value)) {
+ ae.iterate(value, buffer)
+ seens(idx).add(value)
+ }
+ } else {
+ ae.iterate(value, buffer)
}
- resultProjection(joinedRow(aggregateResults, currentGroup))
+ idx += 1
}
}
+
+ createIterator(aggregates, Iterator(ibs))
+ } else {
+ val results = new OpenHashMap[Row, InputBufferSeens]()
+
+ while (iter.hasNext) {
+ val currentRow = iter.next()
+
+ val keys = groupByProjection(currentRow)
+ results(keys) match {
+ case null =>
+ val buffer = new GenericMutableRow(bufferSchema.length)
+ // TODO save the memory only for those DISTINCT aggregate
expressions
+ val seens = new
Array[OpenHashSet[Any]](aggregateFunctionBinds.length)
+
+ var idx = 0
+ while (idx < aggregateFunctionBinds.length) {
+ val ae = aggregates(idx)
+ val value = ae.eval(currentRow)
+ ae.reset(buffer)
+ ae.iterate(value, buffer)
+
+ if (ae.distinct) {
+ val seen = new OpenHashSet[Any]()
+ if (value != null) {
+ seen.add(value)
+ }
+ seens.update(idx, seen)
+ }
+
+ idx += 1
+ }
+ results(keys.copy()) = new
InputBufferSeens(currentRow.copy(), buffer, seens)
+
+ case inputBufferSeens =>
+ var idx = 0
+ while (idx < aggregateFunctionBinds.length) {
+ val ae = aggregates(idx)
+ val value = ae.eval(currentRow)
+
+ if (ae.distinct) {
+ if (value != null &&
!inputBufferSeens.seens(idx).contains(value)) {
+ ae.iterate(value, inputBufferSeens.buffer)
+ inputBufferSeens.seens(idx).add(value)
+ }
+ } else {
+ ae.iterate(value, inputBufferSeens.buffer)
+ }
+ idx += 1
+ }
+ }
+ }
+
+ createIterator(aggregates, results.iterator.map(_._2))
--- End diff --
AggregatePreShuffle, AggregatePostShuffle, and DistinctAggregate have many
similar codes. Can we merge these three classes into one by having only two
Mode states?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]