Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7057#discussion_r34175570
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
    @@ -37,443 +67,615 @@ case class Window(
         child: SparkPlan)
       extends UnaryNode {
     
    -  override def output: Seq[Attribute] =
    -    (projectList ++ windowExpression).map(_.toAttribute)
    +  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
     
    -  override def requiredChildDistribution: Seq[Distribution] =
    +  override def requiredChildDistribution: Seq[Distribution] = {
         if (windowSpec.partitionSpec.isEmpty) {
    -      // This operator will be very expensive.
    +      // Only show warning when the number of bytes is larger than 100 MB?
    +      logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
    +        + "partition, this can cause serious performance degradation.")
           AllTuples :: Nil
    -    } else {
    -      ClusteredDistribution(windowSpec.partitionSpec) :: Nil
    -    }
    -
    -  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
    -  // is preserved.
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    -
    -  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
    -    // The required child ordering has two parts.
    -    // The first part is the expressions in the partition specification.
    -    // We add these expressions to the required ordering to make sure 
input rows are grouped
    -    // based on the partition specification. So, we only need to process a 
single partition
    -    // at a time.
    -    // The second part is the expressions specified in the ORDER BY cluase.
    -    // Basically, we first use sort to group rows based on partition 
specifications and then sort
    -    // Rows in a group based on the order specification.
    -    (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
    +    } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
       }
     
    -  // Since window functions basically add columns to input rows, this 
operator
    -  // will not change the ordering of input rows.
    +  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
    +    Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
    +
       override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  case class ComputedWindow(
    -    unbound: WindowExpression,
    -    windowFunction: WindowFunction,
    -    resultAttribute: AttributeReference)
    -
    -  // A list of window functions that need to be computed for each group.
    -  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
    -    window.collect {
    -      case w: WindowExpression =>
    -        ComputedWindow(
    -          w,
    -          BindReferences.bindReference(w.windowFunction, child.output),
    -          AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
    +  @transient
    +  private[this] lazy val (factories, projection, columns) = {
    +    // Helper method for creating bound ordering objects.
    +    def createBoundOrdering(frameType: FrameType, offset: Int) = frameType 
match {
    +      case RangeFrame =>
    +        // Use the entire order expression when the offset is 0.
    +        val (exprs, current, bound) = if (offset == 0) {
    +          val exprs = windowSpec.orderSpec.map(_.child)
    +          val projection = newMutableProjection(exprs, child.output)
    +          (windowSpec.orderSpec, projection(), projection())
    +        }
    +        // Use only the first order expression when the offset is non-null.
    +        else {
    +          val sortExpr = windowSpec.orderSpec.head
    +          val expr = sortExpr.child
    +          val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
    +          val current = newMutableProjection(expr :: Nil, child.output)()
    +          val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
    +          (sortExpr :: Nil, current, bound)
    +        }
    +        // Construct the ordering.
    +        val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) =>
    +          val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)()
    +          (SortOrder(ref, e.direction), ref)
    +        }.unzip
    +        val ordering = newOrdering(sortExprs, schema)
    +        RangeBoundOrdering(ordering, current, bound)
    +      case RowFrame => RowBoundOrdering(offset)
         }
    -  }.toArray
    -
    -  private[this] val windowFrame =
    -    windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
     
    -  // Create window functions.
    -  private[this] def windowFunctions(): Array[WindowFunction] = {
    -    val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
    -    var i = 0
    -    while (i < computedWindowExpressions.length) {
    -      functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
    -      functions(i).init()
    -      i += 1
    +    // Collect all window expressions
    +    val windowExprs = windowExpression.flatMap {
    +      _.collect {
    +        case e: WindowExpression => e
    +      }
         }
    -    functions
    -  }
    -
    -  // The schema of the result of all window function evaluations
    -  private[this] val computedSchema = 
computedWindowExpressions.map(_.resultAttribute)
     
    -  private[this] val computedResultMap =
    -    computedWindowExpressions.map { w => w.unbound -> w.resultAttribute 
}.toMap
    +    // Group the window expression by their processing frame.
    +    val groupedWindowExprs = 
windowExprs.groupBy(_.windowSpec.frameSpecification)
    +
    +    // Create factories and collect unbound expressions for each frame.
    +    val factories = mutable.Buffer.empty[WindowFunctionFrame]
    +    val unboundExpressions = mutable.Buffer.empty[Expression]
    +    groupedWindowExprs.foreach {
    +      case (frame, unboundFrameExpressions) =>
    +        // Register the ordinal.
    +        val ordinal = unboundExpressions.size
    +
    +        // Track the unbound expressions
    +        unboundExpressions ++= unboundFrameExpressions
    +
    +        // Bind the expressions.
    +        val functions = unboundFrameExpressions.map { e =>
    +          BindReferences.bindReference(e.windowFunction, child.output)
    +        }.toArray
    +
    +        // Create the frame processors.
    +        val factory = frame match {
    +          // Growing Frame.
    +          case SpecifiedWindowFrame(frameType, UnboundedPreceding, 
FrameBoundaryExtractor(high)) =>
    +            val uBoundOrdering = createBoundOrdering(frameType, high)
    +            new UnboundedPrecedingWindowFunctionFrame(ordinal, functions, 
uBoundOrdering)
    +          // Shrinking Frame.
    +          case SpecifiedWindowFrame(frameType, 
FrameBoundaryExtractor(low), UnboundedFollowing) =>
    +            val lBoundOrdering = createBoundOrdering(frameType, low)
    +            new UnboundedFollowingWindowFunctionFrame(ordinal, functions, 
lBoundOrdering)
    +          // Moving Frame.
    +          case SpecifiedWindowFrame(frameType,
    +              FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) =>
    +            val lBoundOrdering = createBoundOrdering(frameType, low)
    +            val uBoundOrdering = createBoundOrdering(frameType, high)
    +            new SlidingWindowFunctionFrame(ordinal, functions, 
lBoundOrdering, uBoundOrdering)
    +          // Entire Partition Frame.
    +          case SpecifiedWindowFrame(_, UnboundedPreceding, 
UnboundedFollowing) =>
    +            new UnboundedWindowFunctionFrame(ordinal, functions)
    +          // Error
    +          case fr =>
    +            sys.error(s"Unsupported Frame $fr for expressions: 
$unboundFrameExpressions")
    +        }
    +        factories += factory
    +    }
     
    -  private[this] val windowExpressionResult = windowExpression.map { window 
=>
    -    window.transform {
    -      case w: WindowExpression if computedResultMap.contains(w) => 
computedResultMap(w)
    +    // Create the schema projection.
    +    val unboundToAttr = unboundExpressions.map {
    +      e => (e, AttributeReference(s"aggResult:$e", e.dataType, 
e.nullable)())
         }
    +    val unboundToAttrMap = unboundToAttr.toMap
    +    val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToAttrMap))
    +    val projection = newMutableProjection(
    +      projectList ++ patchedWindowExpression,
    +      child.output ++ unboundToAttr.map(_._2))
    +
    +    // Done
    +    (factories.toArray, projection, unboundExpressions.size)
       }
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    child.execute().mapPartitions { iter =>
    +    child.execute().mapPartitions { stream =>
           new Iterator[InternalRow] {
    -
    -        // Although input rows are grouped based on 
windowSpec.partitionSpec, we need to
    -        // know when we have a new partition.
    -        // This is to manually construct an ordering that can be used to 
compare rows.
    -        // TODO: We may want to have a newOrdering that takes 
BoundReferences.
    -        // So, we can take advantave of code gen.
    -        private val partitionOrdering: Ordering[InternalRow] =
    -          RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType))
    -
    -        // This is used to project expressions for the partition 
specification.
    -        protected val partitionGenerator =
    -          newMutableProjection(windowSpec.partitionSpec, child.output)()
    -
    -        // This is ued to project expressions for the order specification.
    -        protected val rowOrderGenerator =
    -          newMutableProjection(windowSpec.orderSpec.map(_.child), 
child.output)()
    -
    -        // The position of next output row in the inputRowBuffer.
    -        var rowPosition: Int = 0
    -        // The number of buffered rows in the inputRowBuffer (the size of 
the current partition).
    -        var partitionSize: Int = 0
    -        // The buffer used to buffer rows in a partition.
    -        var inputRowBuffer: CompactBuffer[InternalRow] = _
    -        // The partition key of the current partition.
    -        var currentPartitionKey: InternalRow = _
    -        // The partition key of next partition.
    -        var nextPartitionKey: InternalRow = _
    -        // The first row of next partition.
    -        var firstRowInNextPartition: InternalRow = _
    -        // Indicates if this partition is the last one in the iter.
    -        var lastPartition: Boolean = false
    -
    -        def createBoundaryEvaluator(): () => Unit = {
    -          def findPhysicalBoundary(
    -              boundary: FrameBoundary): () => Int = boundary match {
    -            case UnboundedPreceding => () => 0
    -            case UnboundedFollowing => () => partitionSize - 1
    -            case CurrentRow => () => rowPosition
    -            case ValuePreceding(value) =>
    -              () =>
    -                val newPosition = rowPosition - value
    -                if (newPosition > 0) newPosition else 0
    -            case ValueFollowing(value) =>
    -              () =>
    -                val newPosition = rowPosition + value
    -                if (newPosition < partitionSize) newPosition else 
partitionSize - 1
    -          }
    -
    -          def findLogicalBoundary(
    -              boundary: FrameBoundary,
    -              searchDirection: Int,
    -              evaluator: Expression,
    -              joinedRow: JoinedRow): () => Int = boundary match {
    -            case UnboundedPreceding => () => 0
    -            case UnboundedFollowing => () => partitionSize - 1
    -            case other =>
    -              () => {
    -                // CurrentRow, ValuePreceding, or ValueFollowing.
    -                var newPosition = rowPosition + searchDirection
    -                var stopSearch = false
    -                // rowOrderGenerator is a mutable projection.
    -                // We need to make a copy of the returned by 
rowOrderGenerator since we will
    -                // compare searched row with this currentOrderByValue.
    -                val currentOrderByValue = 
rowOrderGenerator(inputRowBuffer(rowPosition)).copy()
    -                while (newPosition >= 0 && newPosition < partitionSize && 
!stopSearch) {
    -                  val r = rowOrderGenerator(inputRowBuffer(newPosition))
    -                  stopSearch =
    -                    !(evaluator.eval(joinedRow(currentOrderByValue, 
r)).asInstanceOf[Boolean])
    -                  if (!stopSearch) {
    -                    newPosition += searchDirection
    -                  }
    -                }
    -                newPosition -= searchDirection
    -
    -                if (newPosition < 0) {
    -                  0
    -                } else if (newPosition >= partitionSize) {
    -                  partitionSize - 1
    -                } else {
    -                  newPosition
    -                }
    -              }
    -          }
    -
    -          windowFrame.frameType match {
    -            case RowFrame =>
    -              val findStart = findPhysicalBoundary(windowFrame.frameStart)
    -              val findEnd = findPhysicalBoundary(windowFrame.frameEnd)
    -              () => {
    -                frameStart = findStart()
    -                frameEnd = findEnd()
    -              }
    -            case RangeFrame =>
    -              val joinedRowForBoundaryEvaluation: JoinedRow = new 
JoinedRow()
    -              val orderByExpr = windowSpec.orderSpec.head
    -              val currentRowExpr =
    -                BoundReference(0, orderByExpr.dataType, 
orderByExpr.nullable)
    -              val examedRowExpr =
    -                BoundReference(1, orderByExpr.dataType, 
orderByExpr.nullable)
    -              val differenceExpr = Abs(Subtract(currentRowExpr, 
examedRowExpr))
    -
    -              val frameStartEvaluator = windowFrame.frameStart match {
    -                case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
    -                case ValuePreceding(value) =>
    -                  LessThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case ValueFollowing(value) =>
    -                  GreaterThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case o => Literal(true) // This is just a dummy 
expression, we will not use it.
    -              }
    -
    -              val frameEndEvaluator = windowFrame.frameEnd match {
    -                case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
    -                case ValuePreceding(value) =>
    -                  GreaterThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case ValueFollowing(value) =>
    -                  LessThanOrEqual(differenceExpr, Cast(Literal(value), 
orderByExpr.dataType))
    -                case o => Literal(true) // This is just a dummy 
expression, we will not use it.
    -              }
    -
    -              val findStart =
    -                findLogicalBoundary(
    -                  boundary = windowFrame.frameStart,
    -                  searchDirection = -1,
    -                  evaluator = frameStartEvaluator,
    -                  joinedRow = joinedRowForBoundaryEvaluation)
    -              val findEnd =
    -                findLogicalBoundary(
    -                  boundary = windowFrame.frameEnd,
    -                  searchDirection = 1,
    -                  evaluator = frameEndEvaluator,
    -                  joinedRow = joinedRowForBoundaryEvaluation)
    -              () => {
    -                frameStart = findStart()
    -                frameEnd = findEnd()
    -              }
    +        // Get all relevant projections.
    +        val result = projection()
    +        val grouping = newProjection(windowSpec.partitionSpec, 
child.output)
    +
    +        // Manage the stream and the grouping.
    +        var nextRow: InternalRow = EmptyRow
    +        var nextGroup: InternalRow = EmptyRow
    +        var nextRowAvailable: Boolean = false
    +        private[this] def fetchNextRow() {
    +          nextRowAvailable = stream.hasNext
    +          if (nextRowAvailable) {
    +            nextRow = stream.next()
    +            nextGroup = grouping(nextRow)
    +          } else {
    +            nextRow = EmptyRow
    +            nextGroup = EmptyRow
               }
             }
    +        fetchNextRow()
    +
    +        // Manage the current partition.
    +        var rows: CompactBuffer[InternalRow] = _
    +        val frames: Array[WindowFunctionFrame] = factories.map(_.copy)
    +        val frameCount = frames.length
    +        private[this] def fetchNextPartition() {
    +          // Collect all the rows in the current partition.
    +          val currentGroup = nextGroup
    +          rows = new CompactBuffer
    +          while (nextRowAvailable && nextGroup == currentGroup) {
    +            rows += nextRow.copy()
    +            fetchNextRow()
    +          }
     
    -        val boundaryEvaluator = createBoundaryEvaluator()
    -        // Indicates if we the specified window frame requires us to 
maintain a sliding frame
    -        // (e.g. RANGES BETWEEN 1 PRECEDING AND CURRENT ROW) or the window 
frame
    -        // is the entire partition (e.g. ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING).
    -        val requireUpdateFrame: Boolean = {
    -          def requireUpdateBoundary(boundary: FrameBoundary): Boolean = 
boundary match {
    -            case UnboundedPreceding => false
    -            case UnboundedFollowing => false
    -            case _ => true
    +          // Setup the frames.
    +          var i = 0
    +          while (i < frameCount) {
    +            frames(i).prepare(rows)
    +            i += 1
               }
     
    -          requireUpdateBoundary(windowFrame.frameStart) ||
    -            requireUpdateBoundary(windowFrame.frameEnd)
    +          // Setup iteration
    +          rowIndex = 0
    +          rowsSize = rows.size
             }
    -        // The start position of the current frame in the partition.
    -        var frameStart: Int = 0
    -        // The end position of the current frame in the partition.
    -        var frameEnd: Int = -1
    -        // Window functions.
    -        val functions: Array[WindowFunction] = windowFunctions()
    -        // Buffers used to store input parameters for window functions. 
Because we may need to
    -        // maintain a sliding frame, we use this buffer to avoid evaluate 
the parameters from
    -        // the same row multiple times.
    -        val windowFunctionParameterBuffers: Array[util.LinkedList[AnyRef]] 
=
    -          functions.map(_ => new util.LinkedList[AnyRef]())
    -
    -        // The projection used to generate the final result rows of this 
operator.
    -        private[this] val resultProjection =
    -          newMutableProjection(
    -            projectList ++ windowExpressionResult,
    -            projectList ++ computedSchema)()
    -
    -        // The row used to hold results of window functions.
    -        private[this] val windowExpressionResultRow =
    -          new GenericMutableRow(computedSchema.length)
    -
    -        private[this] val joinedRow = new JoinedRow6
    -
    -        // Initialize this iterator.
    -        initialize()
    -
    -        private def initialize(): Unit = {
    -          if (iter.hasNext) {
    -            val currentRow = iter.next().copy()
    -            // partitionGenerator is a mutable projection. Since we need 
to track nextPartitionKey,
    -            // we are making a copy of the returned partitionKey at here.
    -            nextPartitionKey = partitionGenerator(currentRow).copy()
    -            firstRowInNextPartition = currentRow
    +
    +        // Iteration
    +        var rowIndex = 0
    +        var rowsSize = 0
    +        def hasNext: Boolean = {
    --- End diff --
    
    Nothing unexpected will happen. When the current partition has been 
completely processed, the hasNext call will load the next partition as a side 
effect. Calls after that will not have side effects, they will only check the 
index and its bound.
    
    We can move partition loading to the next() method though. This is not a 
problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to