Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/9819#discussion_r47409455 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -736,15 +691,156 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame( // Only recalculate and update when the buffer changes. if (bufferUpdated) { - evaluatePrepared(buffer, inputIndex, buffer.length) - fill(target, outputIndex) + processor.initialize(input.size) + processor.update(input, inputIndex, input.size) + processor.evaluate(target) } // Move to the next row. outputIndex += 1 } +} + +/** + * This class prepares and manages the processing of a number of aggregate functions. + * + * This implementation only supports evaluation in [[Complete]] mode. This is enough for + * Window processing. + * + * Processing of distinct aggregates is currently not supported. + * + * The implementation is split into an object which takes care of construction, and a the actual + * processor class. Construction might be expensive and could be separated into a 'driver' and a + * 'executor' part. + */ +private[execution] object AggregateProcessor { + def apply(functions: Array[Expression], + ordinal: Int, + inputAttributes: Seq[Attribute], + newMutableProjection: (Seq[Expression], Seq[Attribute]) => () => MutableProjection): + AggregateProcessor = { + val aggBufferAttributes = mutable.Buffer.empty[AttributeReference] + val initialValues = mutable.Buffer.empty[Expression] + val updateExpressions = mutable.Buffer.empty[Expression] + val evaluateExpressions = mutable.Buffer.fill[Expression](ordinal)(NoOp) + val imperatives = mutable.Buffer.empty[ImperativeAggregate] + + // Create and add a size reference to SizeBasedWindowFunction. + var sizeOrdinal = -1 + var size: BoundReference = null + val addSize = (f: Expression) => f match { + case wf: SizeBasedWindowFunction => + if (size == null) { + sizeOrdinal = aggBufferAttributes.size + size = BoundReference(sizeOrdinal, IntegerType, false) + aggBufferAttributes += wf.size + initialValues += NoOp + updateExpressions += NoOp + } + wf.withSize(size) + case e => e + } + + // Add an AggregateFunction to the AggregateProcessor. + val addToProcessor = (f: Expression) => f match { + case agg: DeclarativeAggregate => + aggBufferAttributes ++= agg.aggBufferAttributes + initialValues ++= agg.initialValues + updateExpressions ++= agg.updateExpressions + evaluateExpressions += agg.evaluateExpression + case agg: ImperativeAggregate => + val offset = aggBufferAttributes.size + val imperative = BindReferences.bindReference(agg + .withNewInputAggBufferOffset(offset) + .withNewMutableAggBufferOffset(offset), + inputAttributes) + imperatives += imperative + aggBufferAttributes ++= imperative.aggBufferAttributes + val noOps = Seq.fill(imperative.aggBufferAttributes.size)(NoOp) + initialValues ++= noOps + updateExpressions ++= noOps + evaluateExpressions += imperative + case other => + sys.error(s"Unsupported Aggregate Function: $f") + } + + // Process the functions. + functions.foreach(addSize.andThen(addToProcessor)) + + // Create the projections. + val initialProjection = newMutableProjection(initialValues, Nil)() + val updateProjection = newMutableProjection( + updateExpressions, + aggBufferAttributes ++ inputAttributes)() + val evaluateProjection = newMutableProjection( + evaluateExpressions, + aggBufferAttributes)() --- End diff -- It could fail to bound for ImperativeAggregate
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org