Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9819#discussion_r47409455
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
    @@ -736,15 +691,156 @@ private[execution] final class 
UnboundedFollowingWindowFunctionFrame(
     
         // Only recalculate and update when the buffer changes.
         if (bufferUpdated) {
    -      evaluatePrepared(buffer, inputIndex, buffer.length)
    -      fill(target, outputIndex)
    +      processor.initialize(input.size)
    +      processor.update(input, inputIndex, input.size)
    +      processor.evaluate(target)
         }
     
         // Move to the next row.
         outputIndex += 1
       }
    +}
    +
    +/**
    + * This class prepares and manages the processing of a number of aggregate 
functions.
    + *
    + * This implementation only supports evaluation in [[Complete]] mode. This 
is enough for
    + * Window processing.
    + *
    + * Processing of distinct aggregates is currently not supported.
    + *
    + * The implementation is split into an object which takes care of 
construction, and a the actual
    + * processor class. Construction might be expensive and could be separated 
into a 'driver' and a
    + * 'executor' part.
    + */
    +private[execution] object AggregateProcessor {
    +  def apply(functions: Array[Expression],
    +    ordinal: Int,
    +    inputAttributes: Seq[Attribute],
    +    newMutableProjection: (Seq[Expression], Seq[Attribute]) => () => 
MutableProjection):
    +    AggregateProcessor = {
    +    val aggBufferAttributes = mutable.Buffer.empty[AttributeReference]
    +    val initialValues = mutable.Buffer.empty[Expression]
    +    val updateExpressions = mutable.Buffer.empty[Expression]
    +    val evaluateExpressions = 
mutable.Buffer.fill[Expression](ordinal)(NoOp)
    +    val imperatives = mutable.Buffer.empty[ImperativeAggregate]
    +
    +    // Create and add a size reference to SizeBasedWindowFunction.
    +    var sizeOrdinal = -1
    +    var size: BoundReference = null
    +    val addSize = (f: Expression) => f match {
    +      case wf: SizeBasedWindowFunction =>
    +        if (size == null) {
    +          sizeOrdinal = aggBufferAttributes.size
    +          size = BoundReference(sizeOrdinal, IntegerType, false)
    +          aggBufferAttributes += wf.size
    +          initialValues += NoOp
    +          updateExpressions += NoOp
    +        }
    +        wf.withSize(size)
    +      case e => e
    +    }
    +
    +    // Add an AggregateFunction to the AggregateProcessor.
    +    val addToProcessor = (f: Expression) => f match {
    +      case agg: DeclarativeAggregate =>
    +        aggBufferAttributes ++= agg.aggBufferAttributes
    +        initialValues ++= agg.initialValues
    +        updateExpressions ++= agg.updateExpressions
    +        evaluateExpressions += agg.evaluateExpression
    +      case agg: ImperativeAggregate =>
    +        val offset = aggBufferAttributes.size
    +        val imperative = BindReferences.bindReference(agg
    +          .withNewInputAggBufferOffset(offset)
    +          .withNewMutableAggBufferOffset(offset),
    +          inputAttributes)
    +        imperatives += imperative
    +        aggBufferAttributes ++= imperative.aggBufferAttributes
    +        val noOps = Seq.fill(imperative.aggBufferAttributes.size)(NoOp)
    +        initialValues ++= noOps
    +        updateExpressions ++= noOps
    +        evaluateExpressions += imperative
    +      case other =>
    +        sys.error(s"Unsupported Aggregate Function: $f")
    +    }
    +
    +    // Process the functions.
    +    functions.foreach(addSize.andThen(addToProcessor))
    +
    +    // Create the projections.
    +    val initialProjection = newMutableProjection(initialValues, Nil)()
    +    val updateProjection = newMutableProjection(
    +      updateExpressions,
    +      aggBufferAttributes ++ inputAttributes)()
    +    val evaluateProjection = newMutableProjection(
    +      evaluateExpressions,
    +      aggBufferAttributes)()
    --- End diff --
    
    It could fail to bound for ImperativeAggregate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to