Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/9819#discussion_r47031556
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---
@@ -156,36 +165,90 @@ case class Window(
* @param frame boundaries.
* @param functions to process in the frame.
* @param ordinal at which the processor starts writing to the output.
+ * @param target to which the processor will write.
* @return a frame processor.
*/
private[this] def createFrameProcessor(
- frame: WindowFrame,
- functions: Array[WindowFunction],
- ordinal: Int): WindowFunctionFrame = frame match {
- // Growing Frame.
- case SpecifiedWindowFrame(frameType, UnboundedPreceding,
FrameBoundaryExtractor(high)) =>
- val uBoundOrdering = createBoundOrdering(frameType, high)
- new UnboundedPrecedingWindowFunctionFrame(ordinal, functions,
uBoundOrdering)
-
- // Shrinking Frame.
- case SpecifiedWindowFrame(frameType, FrameBoundaryExtractor(low),
UnboundedFollowing) =>
- val lBoundOrdering = createBoundOrdering(frameType, low)
- new UnboundedFollowingWindowFunctionFrame(ordinal, functions,
lBoundOrdering)
-
- // Moving Frame.
- case SpecifiedWindowFrame(frameType,
- FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) =>
- val lBoundOrdering = createBoundOrdering(frameType, low)
- val uBoundOrdering = createBoundOrdering(frameType, high)
- new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering,
uBoundOrdering)
-
- // Entire Partition Frame.
- case SpecifiedWindowFrame(_, UnboundedPreceding, UnboundedFollowing) =>
- new UnboundedWindowFunctionFrame(ordinal, functions)
-
- // Error
- case fr =>
- sys.error(s"Unsupported Frame $fr for functions: $functions")
+ frame: (Char, WindowFrame),
+ functions: Array[Expression],
+ ordinal: Int,
+ target: MutableRow): WindowFunctionFrame = {
+
+ // Construct an aggregate processor if we have to.
+ def processor = {
+ val prepared = functions.map(_.asInstanceOf[AggregateFunction])
+ AggregateProcessor(prepared, ordinal, child.output,
newMutableProjection)
+ }
+
+ // Create the frame processor.
+ frame match {
+ // Offset Frame
+ case ('O', SpecifiedWindowFrame(RowFrame,
+ FrameBoundaryExtractor(l),
+ FrameBoundaryExtractor(h)))
+ if l == h =>
+ new OffsetWindowFunctionFrame(target, ordinal, functions,
child.output,
+ newMutableProjection, l)
+
+ // Growing Frame.
+ case ('A', SpecifiedWindowFrame(frameType,
+ UnboundedPreceding,
+ FrameBoundaryExtractor(high))) =>
+ val uBoundOrdering = createBoundOrdering(frameType, high)
+ new UnboundedPrecedingWindowFunctionFrame(target, processor,
uBoundOrdering)
+
+ // Shrinking Frame.
+ case ('A', SpecifiedWindowFrame(frameType,
+ FrameBoundaryExtractor(low),
+ UnboundedFollowing)) =>
+ val lBoundOrdering = createBoundOrdering(frameType, low)
+ new UnboundedFollowingWindowFunctionFrame(target, processor,
lBoundOrdering)
+
+ // Moving Frame.
+ case ('A', SpecifiedWindowFrame(frameType,
+ FrameBoundaryExtractor(l),
+ FrameBoundaryExtractor(h))) =>
+ val lBoundOrdering = createBoundOrdering(frameType, l)
+ val uBoundOrdering = createBoundOrdering(frameType, h)
+ new SlidingWindowFunctionFrame(target, processor, lBoundOrdering,
uBoundOrdering)
+
+ // Entire Partition Frame.
+ case ('A', SpecifiedWindowFrame(_,
+ UnboundedPreceding,
+ UnboundedFollowing)) =>
+ new UnboundedWindowFunctionFrame(target, processor)
+ }
+ }
+
+ /** Map of window expressions and functions by their key. */
+ private[this] lazy val windowFunctionMap: FunctionMap = {
+ val functions: FunctionMap = mutable.Map.empty
+
+ // Add a function and its function to the map for a given frame.
+ def collect(tpe: Char, frame: WindowFrame, e: Expression, f:
Expression): Unit = {
+ val (es, fs) = functions.getOrElseUpdate(
+ (tpe, frame), (ArrayBuffer.empty[Expression],
ArrayBuffer.empty[Expression]))
+ es.append(e)
+ fs.append(f)
+ }
+
+ // Collect all valid window functions.
+ windowExpression.foreach { x =>
+ x.foreach {
+ case e @ WindowExpression(function, spec) =>
+ val frame = spec.frameSpecification
+ function match {
+ case AggregateExpression(f, _, _) => collect('A', frame, e, f)
--- End diff --
Will this be easier to understand if we use fullname instead of `A`/`O`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]