Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7057#discussion_r34844383
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
    @@ -38,443 +84,661 @@ case class Window(
         child: SparkPlan)
       extends UnaryNode {
     
    -  override def output: Seq[Attribute] =
    -    (projectList ++ windowExpression).map(_.toAttribute)
    +  override def output: Seq[Attribute] = projectList ++ 
windowExpression.map(_.toAttribute)
     
    -  override def requiredChildDistribution: Seq[Distribution] =
    +  override def requiredChildDistribution: Seq[Distribution] = {
         if (windowSpec.partitionSpec.isEmpty) {
    -      // This operator will be very expensive.
    +      // Only show warning when the number of bytes is larger than 100 MB?
    +      logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
    +        + "partition, this can cause serious performance degradation.")
           AllTuples :: Nil
    -    } else {
    -      ClusteredDistribution(windowSpec.partitionSpec) :: Nil
    -    }
    -
    -  // Since window functions are adding columns to the input rows, the 
child's outputPartitioning
    -  // is preserved.
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    -
    -  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
    -    // The required child ordering has two parts.
    -    // The first part is the expressions in the partition specification.
    -    // We add these expressions to the required ordering to make sure 
input rows are grouped
    -    // based on the partition specification. So, we only need to process a 
single partition
    -    // at a time.
    -    // The second part is the expressions specified in the ORDER BY cluase.
    -    // Basically, we first use sort to group rows based on partition 
specifications and then sort
    -    // Rows in a group based on the order specification.
    -    (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec) :: Nil
    +    } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
       }
     
    -  // Since window functions basically add columns to input rows, this 
operator
    -  // will not change the ordering of input rows.
    +  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
    +    Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ 
windowSpec.orderSpec)
    +
       override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  case class ComputedWindow(
    -    unbound: WindowExpression,
    -    windowFunction: WindowFunction,
    -    resultAttribute: AttributeReference)
    -
    -  // A list of window functions that need to be computed for each group.
    -  private[this] val computedWindowExpressions = windowExpression.flatMap { 
window =>
    -    window.collect {
    -      case w: WindowExpression =>
    -        ComputedWindow(
    -          w,
    -          BindReferences.bindReference(w.windowFunction, child.output),
    -          AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
    +  /**
    +   * Create a bound ordering object for a given frame type and offset. A 
bound ordering object is
    +   * used to determine which input row lies within the frame boundaries of 
an output row.
    +   *
    +   * This method uses Code Generation. It can only be used on the executor 
side.
    +   *
    +   * @param frameType to evaluate. This can either be Row or Range based.
    +   * @param offset with respect to the row.
    +   * @return a bound ordering object.
    +   */
    +  private[this] def createBoundOrdering(frameType: FrameType, offset: 
Int): BoundOrdering = {
    +    frameType match {
    +      case RangeFrame =>
    +        val (exprs, current, bound) = if (offset == 0) {
    +          // Use the entire order expression when the offset is 0.
    +          val exprs = windowSpec.orderSpec.map(_.child)
    +          val projection = newMutableProjection(exprs, child.output)
    +          (windowSpec.orderSpec, projection(), projection())
    +        }
    +        else if (windowSpec.orderSpec.size == 1) {
    +          // Use only the first order expression when the offset is 
non-null.
    +          val sortExpr = windowSpec.orderSpec.head
    +          val expr = sortExpr.child
    +          // Create the projection which returns the current 'value'.
    +          val current = newMutableProjection(expr :: Nil, child.output)()
    +          // Create the projection which returns the current 'value' 
modified by adding the offset.
    +          val boundExpr = Add(expr, Cast(Literal.create(offset, 
IntegerType), expr.dataType))
    +          val bound = newMutableProjection(boundExpr :: Nil, 
child.output)()
    +          (sortExpr :: Nil, current, bound)
    +        }
    +        else {
    +          sys.error("Non-Zero range offsets are not supported for windows 
" +
    +            "with multiple order expressions.")
    +        }
    +        // Construct the ordering. This is used to compare the result of 
current value projection
    +        // to the result of bound value projection. This is done manually 
because we want to use
    +        // Code Generation (if it is enabled).
    +        val (sortExprs, schema) = exprs.map { case e =>
    +          val ref = AttributeReference("ordExpr", e.dataType, e.nullable)()
    +          (SortOrder(ref, e.direction), ref)
    +        }.unzip
    +        val ordering = newOrdering(sortExprs, schema)
    +        RangeBoundOrdering(ordering, current, bound)
    +      case RowFrame => RowBoundOrdering(offset)
         }
    -  }.toArray
    +  }
     
    -  private[this] val windowFrame =
    -    windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
    +  /**
    +   * Create a frame processor.
    +   *
    +   * This method uses Code Generation. It can only be used on the executor 
side.
    +   *
    +   * @param frame boundaries.
    +   * @param functions to process in the frame.
    +   * @param ordinal at which the processor starts writing to the output.
    +   * @return a frame processor.
    +   */
    +  private[this] def createFrameProcessor(
    +      frame: WindowFrame,
    +      functions: Array[WindowFunction],
    +      ordinal: Int): WindowFunctionFrame = frame match {
    +    // Growing Frame.
    +    case SpecifiedWindowFrame(frameType, UnboundedPreceding, 
FrameBoundaryExtractor(high)) =>
    +      val uBoundOrdering = createBoundOrdering(frameType, high)
    +      new UnboundedPrecedingWindowFunctionFrame(ordinal, functions, 
uBoundOrdering)
    +
    +    // Shrinking Frame.
    +    case SpecifiedWindowFrame(frameType, FrameBoundaryExtractor(low), 
UnboundedFollowing) =>
    +      val lBoundOrdering = createBoundOrdering(frameType, low)
    +      new UnboundedFollowingWindowFunctionFrame(ordinal, functions, 
lBoundOrdering)
    +
    +    // Moving Frame: reverse processed range frame - bounds need to 
flipped.
    +    case SpecifiedWindowFrame(RangeFrame,
    +        FrameBoundaryExtractor(low), FrameBoundaryExtractor(high))
    +        if (low != 0 || high != 0) && windowSpec.orderSpec.head.direction 
== Descending =>
    +      val lBoundOrdering = createBoundOrdering(RangeFrame, high)
    +      val uBoundOrdering = createBoundOrdering(RangeFrame, low)
    +      new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, 
uBoundOrdering)
    +
    +    // Moving Frame.
    +    case SpecifiedWindowFrame(frameType,
    +        FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) =>
    +      val lBoundOrdering = createBoundOrdering(frameType, low)
    +      val uBoundOrdering = createBoundOrdering(frameType, high)
    +      new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, 
uBoundOrdering)
    +
    +    // Entire Partition Frame.
    +    case SpecifiedWindowFrame(_, UnboundedPreceding, UnboundedFollowing) =>
    +      new UnboundedWindowFunctionFrame(ordinal, functions)
    +
    +    // Error
    +    case fr =>
    +      sys.error(s"Unsupported Frame $fr for functions: $functions")
    +  }
     
    -  // Create window functions.
    -  private[this] def windowFunctions(): Array[WindowFunction] = {
    -    val functions = new 
Array[WindowFunction](computedWindowExpressions.length)
    -    var i = 0
    -    while (i < computedWindowExpressions.length) {
    -      functions(i) = 
computedWindowExpressions(i).windowFunction.newInstance()
    -      functions(i).init()
    -      i += 1
    +  /**
    +   * Create the resulting projection.
    +   *
    +   * This method uses Code Generation. It can only be used on the executor 
side.
    +   *
    +   * @param expressions unbound ordered function expressions.
    +   * @return the final resulting projection.
    +   */
    +  private[this] def createResultProjection(
    +      expressions: Seq[Expression]): MutableProjection = {
    +    val unboundToAttr = expressions.map {
    +      e => (e, AttributeReference("windowResult", e.dataType, 
e.nullable)())
         }
    -    functions
    +    val unboundToAttrMap = unboundToAttr.toMap
    +    val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToAttrMap))
    +    newMutableProjection(
    +      projectList ++ patchedWindowExpression,
    +      child.output ++ unboundToAttr.map(_._2))()
       }
     
    -  // The schema of the result of all window function evaluations
    -  private[this] val computedSchema = 
computedWindowExpressions.map(_.resultAttribute)
    -
    -  private[this] val computedResultMap =
    -    computedWindowExpressions.map { w => w.unbound -> w.resultAttribute 
}.toMap
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    // Prepare processing.
    +    // Group the window expression by their processing frame.
    +    val windowExprs = windowExpression.flatMap {
    +      _.collect {
    +        case e: WindowExpression => e
    +      }
    +    }
     
    -  private[this] val windowExpressionResult = windowExpression.map { window 
=>
    -    window.transform {
    -      case w: WindowExpression if computedResultMap.contains(w) => 
computedResultMap(w)
    +    // Create Frame processor factories and order the unbound window 
expressions by the frame they
    +    // are processed in; this is the order in which their results will be 
written to window
    +    // function result buffer.
    +    val framedWindowExprs = 
windowExprs.groupBy(_.windowSpec.frameSpecification)
    +    val factories = Array.ofDim[() => 
WindowFunctionFrame](framedWindowExprs.size)
    +    val unboundExpressions = mutable.Buffer.empty[Expression]
    +    framedWindowExprs.zipWithIndex.foreach {
    +      case ((frame, unboundFrameExpressions), index) =>
    +        // Track the ordinal.
    +        val ordinal = unboundExpressions.size
    +
    +        // Track the unbound expressions
    +        unboundExpressions ++= unboundFrameExpressions
    +
    +        // Bind the expressions.
    +        val functions = unboundFrameExpressions.map { e =>
    +          BindReferences.bindReference(e.windowFunction, child.output)
    +        }.toArray
    +
    +        // Create the frame processor factory.
    +        factories(index) = () => createFrameProcessor(frame, functions, 
ordinal)
         }
    -  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    child.execute().mapPartitions { iter =>
    +    // Start processing.
    +    child.execute().mapPartitions { stream =>
           new Iterator[InternalRow] {
     
    -        // Although input rows are grouped based on 
windowSpec.partitionSpec, we need to
    -        // know when we have a new partition.
    -        // This is to manually construct an ordering that can be used to 
compare rows.
    -        // TODO: We may want to have a newOrdering that takes 
BoundReferences.
    -        // So, we can take advantave of code gen.
    -        private val partitionOrdering: Ordering[InternalRow] =
    -          RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType))
    -
    -        // This is used to project expressions for the partition 
specification.
    -        protected val partitionGenerator =
    -          newMutableProjection(windowSpec.partitionSpec, child.output)()
    -
    -        // This is ued to project expressions for the order specification.
    -        protected val rowOrderGenerator =
    -          newMutableProjection(windowSpec.orderSpec.map(_.child), 
child.output)()
    -
    -        // The position of next output row in the inputRowBuffer.
    -        var rowPosition: Int = 0
    -        // The number of buffered rows in the inputRowBuffer (the size of 
the current partition).
    -        var partitionSize: Int = 0
    -        // The buffer used to buffer rows in a partition.
    -        var inputRowBuffer: CompactBuffer[InternalRow] = _
    -        // The partition key of the current partition.
    -        var currentPartitionKey: InternalRow = _
    -        // The partition key of next partition.
    -        var nextPartitionKey: InternalRow = _
    -        // The first row of next partition.
    -        var firstRowInNextPartition: InternalRow = _
    -        // Indicates if this partition is the last one in the iter.
    -        var lastPartition: Boolean = false
    -
    -        def createBoundaryEvaluator(): () => Unit = {
    -          def findPhysicalBoundary(
    -              boundary: FrameBoundary): () => Int = boundary match {
    -            case UnboundedPreceding => () => 0
    -            case UnboundedFollowing => () => partitionSize - 1
    -            case CurrentRow => () => rowPosition
    -            case ValuePreceding(value) =>
    -              () =>
    -                val newPosition = rowPosition - value
    -                if (newPosition > 0) newPosition else 0
    -            case ValueFollowing(value) =>
    -              () =>
    -                val newPosition = rowPosition + value
    -                if (newPosition < partitionSize) newPosition else 
partitionSize - 1
    +        // Get all relevant projections.
    +        val result = createResultProjection(unboundExpressions)
    +        val grouping = newProjection(windowSpec.partitionSpec, 
child.output)
    +
    +        // Manage the stream and the grouping.
    +        var nextRow: InternalRow = EmptyRow
    +        var nextGroup: InternalRow = EmptyRow
    +        var nextRowAvailable: Boolean = false
    +        private[this] def fetchNextRow() {
    +          nextRowAvailable = stream.hasNext
    +          if (nextRowAvailable) {
    +            nextRow = stream.next()
    +            nextGroup = grouping(nextRow)
    +          } else {
    +            nextRow = EmptyRow
    +            nextGroup = EmptyRow
               }
    -
    -          def findLogicalBoundary(
    -              boundary: FrameBoundary,
    -              searchDirection: Int,
    -              evaluator: Expression,
    -              joinedRow: JoinedRow): () => Int = boundary match {
    -            case UnboundedPreceding => () => 0
    -            case UnboundedFollowing => () => partitionSize - 1
    -            case other =>
    -              () => {
    -                // CurrentRow, ValuePreceding, or ValueFollowing.
    -                var newPosition = rowPosition + searchDirection
    -                var stopSearch = false
    -                // rowOrderGenerator is a mutable projection.
    -                // We need to make a copy of the returned by 
rowOrderGenerator since we will
    -                // compare searched row with this currentOrderByValue.
    -                val currentOrderByValue = 
rowOrderGenerator(inputRowBuffer(rowPosition)).copy()
    -                while (newPosition >= 0 && newPosition < partitionSize && 
!stopSearch) {
    -                  val r = rowOrderGenerator(inputRowBuffer(newPosition))
    -                  stopSearch =
    -                    !(evaluator.eval(joinedRow(currentOrderByValue, 
r)).asInstanceOf[Boolean])
    -                  if (!stopSearch) {
    -                    newPosition += searchDirection
    -                  }
    -                }
    -                newPosition -= searchDirection
    -
    -                if (newPosition < 0) {
    -                  0
    -                } else if (newPosition >= partitionSize) {
    -                  partitionSize - 1
    -                } else {
    -                  newPosition
    -                }
    -              }
    +        }
    +        fetchNextRow()
    +
    +        // Manage the current partition.
    +        var rows: CompactBuffer[InternalRow] = _
    +        val frames: Array[WindowFunctionFrame] = factories.map(_())
    +        val numFrames = frames.length
    +        private[this] def fetchNextPartition() {
    +          // Collect all the rows in the current partition.
    +          val currentGroup = nextGroup
    +          rows = new CompactBuffer
    +          while (nextRowAvailable && nextGroup == currentGroup) {
    --- End diff --
    
    @yhuai In the current Window operator an ordering was used to separate 
groups. I currently use equality ```nextGroup == currentGroup```. What was the 
rationale for using an ordering instead of equality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to