Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7057#discussion_r34180117
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
    @@ -37,443 +67,615 @@ case class Window(
         child: SparkPlan)
       extends UnaryNode {
     
    -  override def output: Seq[Attribute] =
    -    (projectList ++ windowExpression).map(_.toAttribute)
    +  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
     
    -  override def requiredChildDistribution: Seq[Distribution] =
    +  override def requiredChildDistribution: Seq[Distribution] = {
         if (windowSpec.partitionSpec.isEmpty) {
    -      // This operator will be very expensive.
    +      // Only show warning when the number of bytes is larger than 100 MB?
    +      logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
    +        + "partition, this can cause serious performance degradation.")
           AllTuples :: Nil
    -    } else {
    -      ClusteredDistribution(windowSpec.partitionSpec) :: Nil
    -    }
    -
    -  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
    -  // is preserved.
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    -
    -  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
    -    // The required child ordering has two parts.
    -    // The first part is the expressions in the partition specification.
    -    // We add these expressions to the required ordering to make sure 
input rows are grouped
    -    // based on the partition specification. So, we only need to process a 
single partition
    -    // at a time.
    -    // The second part is the expressions specified in the ORDER BY cluase.
    -    // Basically, we first use sort to group rows based on partition 
specifications and then sort
    -    // Rows in a group based on the order specification.
    -    (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
    +    } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
       }
     
    -  // Since window functions basically add columns to input rows, this 
operator
    -  // will not change the ordering of input rows.
    +  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
    +    Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
    +
       override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  case class ComputedWindow(
    -    unbound: WindowExpression,
    -    windowFunction: WindowFunction,
    -    resultAttribute: AttributeReference)
    -
    -  // A list of window functions that need to be computed for each group.
    -  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
    -    window.collect {
    -      case w: WindowExpression =>
    -        ComputedWindow(
    -          w,
    -          BindReferences.bindReference(w.windowFunction, child.output),
    -          AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
    +  @transient
    +  private[this] lazy val (factories, projection, columns) = {
    +    // Helper method for creating bound ordering objects.
    +    def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
    +      case RangeFrame =>
    +        // Use the entire order expression when the offset is 0.
    +        val (exprs, current, bound) = if (offset == 0) {
    +          val exprs = windowSpec.orderSpec.map(_.child)
    +          val projection = newMutableProjection(exprs, child.output)
    +          (windowSpec.orderSpec, projection(), projection())
    +        }
    +        // Use only the first order expression when the offset is non-null.
    +        else {
    +          val sortExpr = windowSpec.orderSpec.head
    +          val expr = sortExpr.child
    +          val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
    +          val current = newMutableProjection(expr :: Nil, child.output)()
    +          val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
    +          (sortExpr :: Nil, current, bound)
    +        }
    +        // Construct the ordering.
    +        val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
    +          val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
    +          (SortOrder(ref, e.direction), ref)
    +        }.unzip
    +        val ordering = newOrdering(sortExprs, schema)
    +        RangeBoundOrdering(ordering, current, bound)
    +      case RowFrame => RowBoundOrdering(offset)
         }
    -  }.toArray
    -
    -  private[this] val windowFrame =
    -    windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
     
    -  // Create window functions.
    -  private[this] def windowFunctions(): Array[WindowFunction] = {
    -    val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
    -    var i = 0
    -    while (i < computedWindowExpressions.length) {
    -      functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
    -      functions(i).init()
    -      i += 1
    +    // Collect all window expressions
    +    val windowExprs = windowExpression.flatMap {
    +      _.collect {
    +        case e: WindowExpression => e
    +      }
         }
    -    functions
    -  }
    -
    -  // The schema of the result of all window function evaluations
    -  private[this] val computedSchema = 
computedWindowExpressions.map(_.resultAttribute)
     
    -  private[this] val computedResultMap =
    -    computedWindowExpressions.map { w => w.unbound -> w.resultAttribute 
}.toMap
    +    // Group the window expression by their processing frame.
    +    val groupedWindowExprs = 
windowExprs.groupBy(_.windowSpec.frameSpecification)
    +
    +    // Create factories and collect unbound expressions for each frame.
    +    val factories = mutable.Buffer.empty[WindowFunctionFrame]
    +    val unboundExpressions = mutable.Buffer.empty[Expression]
    +    groupedWindowExprs.foreach {
    +      case (frame, unboundFrameExpressions) =>
    +        // Register the ordinal.
    +        val ordinal = unboundExpressions.size
    +
    +        // Track the unbound expressions
    +        unboundExpressions ++= unboundFrameExpressions
    +
    +        // Bind the expressions.
    +        val functions = unboundFrameExpressions.map { e =>
    +          BindReferences.bindReference(e.windowFunction, child.output)
    +        }.toArray
    +
    +        // Create the frame processors.
    +        val factory = frame match {
    +          // Growing Frame.
    +          case SpecifiedWindowFrame(frameType, UnboundedPreceding, 
FrameBoundaryExtractor(high)) =>
    +            val uBoundOrdering = createBoundOrdering(frameType, high)
    +            new UnboundedPrecedingWindowFunctionFrame(ordinal, functions, 
uBoundOrdering)
    +          // Shrinking Frame.
    +          case SpecifiedWindowFrame(frameType, 
FrameBoundaryExtractor(low), UnboundedFollowing) =>
    +            val lBoundOrdering = createBoundOrdering(frameType, low)
    +            new UnboundedFollowingWindowFunctionFrame(ordinal, functions, 
lBoundOrdering)
    +          // Moving Frame.
    +          case SpecifiedWindowFrame(frameType,
    +              FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) =>
    +            val lBoundOrdering = createBoundOrdering(frameType, low)
    +            val uBoundOrdering = createBoundOrdering(frameType, high)
    +            new SlidingWindowFunctionFrame(ordinal, functions, 
lBoundOrdering, uBoundOrdering)
    +          // Entire Partition Frame.
    +          case SpecifiedWindowFrame(_, UnboundedPreceding, 
UnboundedFollowing) =>
    +            new UnboundedWindowFunctionFrame(ordinal, functions)
    +          // Error
    +          case fr =>
    +            sys.error(s"Unsupported Frame $fr for expressions: 
$unboundFrameExpressions")
    +        }
    +        factories += factory
    +    }
     
    -  private[this] val windowExpressionResult = windowExpression.map { window 
=>
    -    window.transform {
    -      case w: WindowExpression if computedResultMap.contains(w) => 
computedResultMap(w)
    +    // Create the schema projection.
    +    val unboundToAttr = unboundExpressions.map {
    +      e => (e, AttributeReference(s"aggResult:$e", e.dataType, 
e.nullable)())
         }
    +    val unboundToAttrMap = unboundToAttr.toMap
    +    val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToAttrMap))
    +    val projection = newMutableProjection(
    +      projectList ++ patchedWindowExpression,
    +      child.output ++ unboundToAttr.map(_._2))
    +
    +    // Done
    +    (factories.toArray, projection, unboundExpressions.size)
       }
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    child.execute().mapPartitions { iter =>
    +    child.execute().mapPartitions { stream =>
           new Iterator[InternalRow] {
    -
    -        // Although input rows are grouped based on 
windowSpec.partitionSpec, we need to
    -        // know when we have a new partition.
    -        // This is to manually construct an ordering that can be used to 
compare rows.
    -        // TODO: We may want to have a newOrdering that takes 
BoundReferences.
    -        // So, we can take advantave of code gen.
    -        private val partitionOrdering: Ordering[InternalRow] =
    -          RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType))
    -
    -        // This is used to project expressions for the partition 
specification.
    -        protected val partitionGenerator =
    -          newMutableProjection(windowSpec.partitionSpec, child.output)()
    -
    -        // This is ued to project expressions for the order specification.
    -        protected val rowOrderGenerator =
    -          newMutableProjection(windowSpec.orderSpec.map(_.child), 
child.output)()
    -
    -        // The position of next output row in the inputRowBuffer.
    -        var rowPosition: Int = 0
    -        // The number of buffered rows in the inputRowBuffer (the size of 
the current partition).
    -        var partitionSize: Int = 0
    -        // The buffer used to buffer rows in a partition.
    -        var inputRowBuffer: CompactBuffer[InternalRow] = _
    -        // The partition key of the current partition.
    -        var currentPartitionKey: InternalRow = _
    -        // The partition key of next partition.
    -        var nextPartitionKey: InternalRow = _
    -        // The first row of next partition.
    -        var firstRowInNextPartition: InternalRow = _
    -        // Indicates if this partition is the last one in the iter.
    -        var lastPartition: Boolean = false
    -
    -        def createBoundaryEvaluator(): () => Unit = {
    -          def findPhysicalBoundary(
    -              boundary: FrameBoundary): () => Int = boundary match {
    -            case UnboundedPreceding => () => 0
    -            case UnboundedFollowing => () => partitionSize - 1
    -            case CurrentRow => () => rowPosition
    -            case ValuePreceding(value) =>
    -              () =>
    -                val newPosition = rowPosition - value
    -                if (newPosition > 0) newPosition else 0
    -            case ValueFollowing(value) =>
    -              () =>
    -                val newPosition = rowPosition + value
    -                if (newPosition < partitionSize) newPosition else 
partitionSize - 1
    -          }
    -
    -          def findLogicalBoundary(
    -              boundary: FrameBoundary,
    -              searchDirection: Int,
    -              evaluator: Expression,
    -              joinedRow: JoinedRow): () => Int = boundary match {
    -            case UnboundedPreceding => () => 0
    -            case UnboundedFollowing => () => partitionSize - 1
    -            case other =>
    -              () => {
    -                // CurrentRow, ValuePreceding, or ValueFollowing.
    -                var newPosition = rowPosition + searchDirection
    -                var stopSearch = false
    -                // rowOrderGenerator is a mutable projection.
    -                // We need to make a copy of the returned by 
rowOrderGenerator since we will
    -                // compare searched row with this currentOrderByValue.
    -                val currentOrderByValue = 
rowOrderGenerator(inputRowBuffer(rowPosition)).copy()
    -                while (newPosition >= 0 && newPosition < partitionSize && 
!stopSearch) {
    -                  val r = rowOrderGenerator(inputRowBuffer(newPosition))
    -                  stopSearch =
    -                    !(evaluator.eval(joinedRow(currentOrderByValue, 
r)).asInstanceOf[Boolean])
    -                  if (!stopSearch) {
    -                    newPosition += searchDirection
    -                  }
    -                }
    -                newPosition -= searchDirection
    -
    -                if (newPosition < 0) {
    -                  0
    -                } else if (newPosition >= partitionSize) {
    -                  partitionSize - 1
    -                } else {
    -                  newPosition
    -                }
    -              }
    -          }
    -
    -          windowFrame.frameType match {
    -            case RowFrame =>
    -              val findStart = findPhysicalBoundary(windowFrame.frameStart)
    -              val findEnd = findPhysicalBoundary(windowFrame.frameEnd)
    -              () => {
    -                frameStart = findStart()
    -                frameEnd = findEnd()
    -              }
    -            case RangeFrame =>
    -              val joinedRowForBoundaryEvaluation: JoinedRow = new 
JoinedRow()
    -              val orderByExpr = windowSpec.orderSpec.head
    -              val currentRowExpr =
    -                BoundReference(0, orderByExpr.dataType, 
orderByExpr.nullable)
    -              val examedRowExpr =
    -                BoundReference(1, orderByExpr.dataType, 
orderByExpr.nullable)
    -              val differenceExpr = Abs(Subtract(currentRowExpr, 
examedRowExpr))
    -
    -              val frameStartEvaluator = windowFrame.frameStart match {
    -                case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
    -                case ValuePreceding(value) =>
    -                  LessThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case ValueFollowing(value) =>
    -                  GreaterThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case o => Literal(true) // This is just a dummy 
expression, we will not use it.
    -              }
    -
    -              val frameEndEvaluator = windowFrame.frameEnd match {
    -                case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
    -                case ValuePreceding(value) =>
    -                  GreaterThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case ValueFollowing(value) =>
    -                  LessThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case o => Literal(true) // This is just a dummy 
expression, we will not use it.
    -              }
    -
    -              val findStart =
    -                findLogicalBoundary(
    -                  boundary = windowFrame.frameStart,
    -                  searchDirection = -1,
    -                  evaluator = frameStartEvaluator,
    -                  joinedRow = joinedRowForBoundaryEvaluation)
    -              val findEnd =
    -                findLogicalBoundary(
    -                  boundary = windowFrame.frameEnd,
    -                  searchDirection = 1,
    -                  evaluator = frameEndEvaluator,
    -                  joinedRow = joinedRowForBoundaryEvaluation)
    -              () => {
    -                frameStart = findStart()
    -                frameEnd = findEnd()
    -              }
    +        // Get all relevant projections.
    +        val result = projection()
    +        val grouping = newProjection(windowSpec.partitionSpec, 
child.output)
    +
    +        // Manage the stream and the grouping.
    +        var nextRow: InternalRow = EmptyRow
    +        var nextGroup: InternalRow = EmptyRow
    +        var nextRowAvailable: Boolean = false
    +        private[this] def fetchNextRow() {
    +          nextRowAvailable = stream.hasNext
    +          if (nextRowAvailable) {
    +            nextRow = stream.next()
    +            nextGroup = grouping(nextRow)
    +          } else {
    +            nextRow = EmptyRow
    +            nextGroup = EmptyRow
               }
             }
    +        fetchNextRow()
    +
    +        // Manage the current partition.
    +        var rows: CompactBuffer[InternalRow] = _
    +        val frames: Array[WindowFunctionFrame] = factories.map(_.copy)
    +        val frameCount = frames.length
    +        private[this] def fetchNextPartition() {
    +          // Collect all the rows in the current partition.
    +          val currentGroup = nextGroup
    +          rows = new CompactBuffer
    +          while (nextRowAvailable && nextGroup == currentGroup) {
    +            rows += nextRow.copy()
    +            fetchNextRow()
    +          }
     
    -        val boundaryEvaluator = createBoundaryEvaluator()
    -        // Indicates if we the specified window frame requires us to 
maintain a sliding frame
    -        // (e.g. RANGES BETWEEN 1 PRECEDING AND CURRENT ROW) or the window 
frame
    -        // is the entire partition (e.g. ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING).
    -        val requireUpdateFrame: Boolean = {
    -          def requireUpdateBoundary(boundary: FrameBoundary): Boolean = 
boundary match {
    -            case UnboundedPreceding => false
    -            case UnboundedFollowing => false
    -            case _ => true
    +          // Setup the frames.
    +          var i = 0
    +          while (i < frameCount) {
    +            frames(i).prepare(rows)
    +            i += 1
               }
     
    -          requireUpdateBoundary(windowFrame.frameStart) ||
    -            requireUpdateBoundary(windowFrame.frameEnd)
    +          // Setup iteration
    +          rowIndex = 0
    +          rowsSize = rows.size
             }
    -        // The start position of the current frame in the partition.
    -        var frameStart: Int = 0
    -        // The end position of the current frame in the partition.
    -        var frameEnd: Int = -1
    -        // Window functions.
    -        val functions: Array[WindowFunction] = windowFunctions()
    -        // Buffers used to store input parameters for window functions. 
Because we may need to
    -        // maintain a sliding frame, we use this buffer to avoid evaluate 
the parameters from
    -        // the same row multiple times.
    -        val windowFunctionParameterBuffers: Array[util.LinkedList[AnyRef]] 
=
    -          functions.map(_ => new util.LinkedList[AnyRef]())
    -
    -        // The projection used to generate the final result rows of this 
operator.
    -        private[this] val resultProjection =
    -          newMutableProjection(
    -            projectList ++ windowExpressionResult,
    -            projectList ++ computedSchema)()
    -
    -        // The row used to hold results of window functions.
    -        private[this] val windowExpressionResultRow =
    -          new GenericMutableRow(computedSchema.length)
    -
    -        private[this] val joinedRow = new JoinedRow6
    -
    -        // Initialize this iterator.
    -        initialize()
    -
    -        private def initialize(): Unit = {
    -          if (iter.hasNext) {
    -            val currentRow = iter.next().copy()
    -            // partitionGenerator is a mutable projection. Since we need 
to track nextPartitionKey,
    -            // we are making a copy of the returned partitionKey at here.
    -            nextPartitionKey = partitionGenerator(currentRow).copy()
    -            firstRowInNextPartition = currentRow
    +
    +        // Iteration
    +        var rowIndex = 0
    +        var rowsSize = 0
    +        def hasNext: Boolean = {
    +          if (nextRowAvailable && rowIndex >= rowsSize) {
                 fetchNextPartition()
    -          } else {
    -            // The iter is an empty one. So, we set all of the following 
variables
    -            // to make sure hasNext will return false.
    -            lastPartition = true
    -            rowPosition = 0
    -            partitionSize = 0
               }
    +          rowIndex < rowsSize
             }
     
    -        // Indicates if we will have new output row.
    -        override final def hasNext: Boolean = {
    -          !lastPartition || (rowPosition < partitionSize)
    -        }
    -
    -        override final def next(): InternalRow = {
    +        val join = new JoinedRow6
    +        val windowFunctionResult = new GenericMutableRow(columns)
    +        def next(): InternalRow = {
               if (hasNext) {
    -            if (rowPosition == partitionSize) {
    -              // All rows of this buffer have been consumed.
    -              // We will move to next partition.
    -              fetchNextPartition()
    -            }
    -            // Get the input row for the current output row.
    -            val inputRow = inputRowBuffer(rowPosition)
    -            // Get all results of the window functions for this output row.
    +            // Get the results for the window frames.
                 var i = 0
    -            while (i < functions.length) {
    -              windowExpressionResultRow.update(i, 
functions(i).get(rowPosition))
    +            while (i < frameCount) {
    +              frames(i).write(windowFunctionResult)
                   i += 1
                 }
     
    -            // Construct the output row.
    -            val outputRow = resultProjection(joinedRow(inputRow, 
windowExpressionResultRow))
    -            // We will move to the next one.
    -            rowPosition += 1
    -            if (requireUpdateFrame && rowPosition < partitionSize) {
    -              // If we need to maintain a sliding frame and
    -              // we will still work on this partition when next is called 
next time, do the update.
    -              updateFrame()
    -            }
    +            // 'Merge' the input row with the window function result
    +            join(rows(rowIndex), windowFunctionResult)
    +            rowIndex += 1
     
    -            // Return the output row.
    -            outputRow
    -          } else {
    -            // no more result
    -            throw new NoSuchElementException
    -          }
    +            // Return the projection.
    +            result(join)
    +          } else throw new NoSuchElementException
             }
    +      }
    +    }
    +  }
    +}
     
    -        // Fetch the next partition.
    -        private def fetchNextPartition(): Unit = {
    -          // Create a new buffer for input rows.
    -          inputRowBuffer = new CompactBuffer[InternalRow]()
    -          // We already have the first row for this partition
    -          // (recorded in firstRowInNextPartition). Add it back.
    -          inputRowBuffer += firstRowInNextPartition
    -          // Set the current partition key.
    -          currentPartitionKey = nextPartitionKey
    -          // Now, we will start to find all rows belonging to this 
partition.
    -          // Create a variable to track if we see the next partition.
    -          var findNextPartition = false
    -          // The search will stop when we see the next partition or there 
is no
    -          // input row left in the iter.
    -          while (iter.hasNext && !findNextPartition) {
    -            // Make a copy of the input row since we will put it in the 
buffer.
    -            val currentRow = iter.next().copy()
    -            // Get the partition key based on the partition specification.
    -            // For the below compare method, we do not need to make a copy 
of partitionKey.
    -            val partitionKey = partitionGenerator(currentRow)
    -            // Check if the current row belongs the current input row.
    -            val comparing = partitionOrdering.compare(currentPartitionKey, 
partitionKey)
    -            if (comparing == 0) {
    -              // This row is still in the current partition.
    -              inputRowBuffer += currentRow
    -            } else {
    -              // The current input row is in a different partition.
    -              findNextPartition = true
    -              // partitionGenerator is a mutable projection.
    -              // Since we need to track nextPartitionKey and we determine 
that it should be set
    -              // as partitionKey, we are making a copy of the partitionKey 
at here.
    -              nextPartitionKey = partitionKey.copy()
    -              firstRowInNextPartition = currentRow
    -            }
    -          }
    +/**
    + * Function for comparing boundary values.
    + */
    +private[execution] abstract class BoundOrdering {
    +  def compare(input: Seq[InternalRow], inputIndex: Int, outputIndex: Int): 
Int
    +}
     
    -          // We have not seen a new partition. It means that there is no 
new row in the
    -          // iter. The current partition is the last partition of the iter.
    -          if (!findNextPartition) {
    -            lastPartition = true
    -          }
    +/**
    + * Compare the input index to the bound of the output index.
    + */
    +private[execution] final case class RowBoundOrdering(offset: Int) extends 
BoundOrdering {
    +  override def compare(input: Seq[InternalRow], inputIndex: Int, 
outputIndex: Int): Int =
    +    inputIndex - (outputIndex + offset)
    +}
     
    -          // We have got all rows for the current partition.
    -          // Set rowPosition to 0 (the next output row will be based on 
the first
    -          // input row of this partition).
    -          rowPosition = 0
    -          // The size of this partition.
    -          partitionSize = inputRowBuffer.size
    -          // Reset all parameter buffers of window functions.
    -          var i = 0
    -          while (i < windowFunctionParameterBuffers.length) {
    -            windowFunctionParameterBuffers(i).clear()
    -            i += 1
    -          }
    -          frameStart = 0
    -          frameEnd = -1
    -          // Create the first window frame for this partition.
    -          // If we do not need to maintain a sliding frame, this frame will
    -          // have the entire partition.
    -          updateFrame()
    -        }
    +/**
    + * Compare the value of the input index to the value bound of the output 
index.
    + */
    +private[execution] final case class RangeBoundOrdering(
    +    ordering: Ordering[InternalRow],
    +    current: Projection,
    +    bound: Projection) extends BoundOrdering {
    +  override def compare(input: Seq[InternalRow], inputIndex: Int, 
outputIndex: Int): Int =
    +    ordering.compare(current(input(inputIndex)), bound(input(outputIndex)))
    +}
     
    -        /** The function used to maintain the sliding frame. */
    -        private def updateFrame(): Unit = {
    -          // Based on the difference between the new frame and old frame,
    -          // updates the buffers holding input parameters of window 
functions.
    -          // We will start to prepare input parameters starting from the 
row
    -          // indicated by offset in the input row buffer.
    -          def updateWindowFunctionParameterBuffers(
    -              numToRemove: Int,
    -              numToAdd: Int,
    -              offset: Int): Unit = {
    -            // First, remove unneeded entries from the head of every 
buffer.
    -            var i = 0
    -            while (i < numToRemove) {
    -              var j = 0
    -              while (j < windowFunctionParameterBuffers.length) {
    -                windowFunctionParameterBuffers(j).remove()
    -                j += 1
    -              }
    -              i += 1
    -            }
    -            // Then, add needed entries to the tail of every buffer.
    -            i = 0
    -            while (i < numToAdd) {
    -              var j = 0
    -              while (j < windowFunctionParameterBuffers.length) {
    -                // Ask the function to prepare the input parameters.
    -                val parameters = 
functions(j).prepareInputParameters(inputRowBuffer(i + offset))
    -                windowFunctionParameterBuffers(j).add(parameters)
    -                j += 1
    -              }
    -              i += 1
    -            }
    -          }
    +/**
    + * A window function calculates the results of a number of window 
functions for a window frame.
    + * Before use a frame must be prepared by passing it all the rows in the 
current partition. After
    + * preparation the update method can be called to fill the output rows.
    + *
    + * TODO How to improve performance? A few thoughts:
    + * - Window functions are expensive due to its distribution and ordering 
requirements.
    + * Unfortunately it is up to the Spark engine to solve this. Improvements 
in the form of project
    + * Tungsten are on the way.
    + * - The window frame processing bit can be improved though. But before we 
start doing that we
    + * need to see how much of the time and resources are spent on 
partitioning and ordering, and
    + * how much time and resources are spent processing the partitions. There 
are a couple ways to
    + * improve on the current situation:
    + * - Reduce memory footprint by performing streaming calculations. This 
can only be done when
    + * there are no Unbound/Unbounded Following calculations present.
    + * - Use Tungsten style memory usage.
    + * - Use code generation in general, and use the approach to aggregation 
taken in the
    + *   GeneratedAggregate class in specific.
    + */
    +private[execution] abstract class WindowFunctionFrame {
    +  /**
    +   * Create a fresh thread safe copy of the frame.
    +   *
    +   * @return the copied frame.
    +   */
    +  def copy: WindowFunctionFrame
    +
    +  /**
    +   * Prepare the frame for calculating the results for a partition.
    +   *
    +   * @param rows to calculate the frame results for.
    +   */
    +  def prepare(rows: Seq[InternalRow]): Unit
    +
    +  /**
    +   * Write the result for the current row to the given target row.
    +   *
    +   * @param target row to write the result for the current row to.
    +   */
    +  def write(target: GenericMutableRow): Unit
    +}
     
    -          // Record the current frame start point and end point before
    -          // we update them.
    -          val previousFrameStart = frameStart
    -          val previousFrameEnd = frameEnd
    -          boundaryEvaluator()
    -          updateWindowFunctionParameterBuffers(
    -            frameStart - previousFrameStart,
    -            frameEnd - previousFrameEnd,
    -            previousFrameEnd + 1)
    -          // Evaluate the current frame.
    -          evaluateCurrentFrame()
    -        }
    +/**
    + * Base class for dealing with aggregating window function frames.
    + *
    + * @param ordinal of the first column written by this frame.
    + * @param functions to calculate the row values with.
    + */
    +private[execution] abstract class AggregateWindowFunctionFrame(
    +    ordinal: Int,
    +    functions: Array[WindowFunction]) extends WindowFunctionFrame {
    --- End diff --
    
    Yes, the Window operator delegates the actual frame processing to 
specialized AggrgateWindowFunctionFrame instances. This enables the Window 
operator to process all types of frame for the same Partition By/Order By 
clause.
    
    At the moment we do not need the WindowFunctionFrame as a super class. It 
is here to allow no-aggregating frames (e.g. LEAD/LAG offset based frames), 
which will become an issue when we move to Spark UDAFs. We can remove it for 
now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to