Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/5604#discussion_r157933488
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{NumericType, DataType}
+
+/**
+ * The trait of the Window Specification (specified in the OVER clause or
WINDOW clause) for
+ * Window Functions.
+ */
+sealed trait WindowSpec
+
+/**
+ * The specification for a window function.
+ * @param partitionSpec It defines the way that input rows are partitioned.
+ * @param orderSpec It defines the ordering of rows in a partition.
+ * @param frameSpecification It defines the window frame in a partition.
+ */
+case class WindowSpecDefinition(
+ partitionSpec: Seq[Expression],
+ orderSpec: Seq[SortOrder],
+ frameSpecification: WindowFrame) extends Expression with WindowSpec {
+
+ def validate: Option[String] = frameSpecification match {
+ case UnspecifiedFrame =>
+ Some("Found a UnspecifiedFrame. It should be converted to a
SpecifiedWindowFrame " +
+ "during analysis. Please file a bug report.")
+ case frame: SpecifiedWindowFrame => frame.validate.orElse {
+ def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
+ if (orderSpec.length > 1) {
+ // It is not allowed to have a value-based PRECEDING and
FOLLOWING
+ // as the boundary of a Range Window Frame.
+ Some("This Range Window Frame only accepts at most one ORDER BY
expression.")
+ } else if (orderSpec.nonEmpty &&
!orderSpec.head.dataType.isInstanceOf[NumericType]) {
+ Some("The data type of the expression in the ORDER BY clause
should be a numeric type.")
+ } else {
+ None
+ }
+ }
+
+ (frame.frameType, frame.frameStart, frame.frameEnd) match {
+ case (RangeFrame, vp: ValuePreceding, _) =>
checkValueBasedBoundaryForRangeFrame()
+ case (RangeFrame, vf: ValueFollowing, _) =>
checkValueBasedBoundaryForRangeFrame()
+ case (RangeFrame, _, vp: ValuePreceding) =>
checkValueBasedBoundaryForRangeFrame()
+ case (RangeFrame, _, vf: ValueFollowing) =>
checkValueBasedBoundaryForRangeFrame()
+ case (_, _, _) => None
+ }
+ }
+ }
+
+ type EvaluatedType = Any
+
+ override def children: Seq[Expression] = partitionSpec ++ orderSpec
+
+ override lazy val resolved: Boolean =
+ childrenResolved &&
frameSpecification.isInstanceOf[SpecifiedWindowFrame]
+
+
+ override def toString: String = simpleString
+
+ override def eval(input: Row): EvaluatedType = throw new
UnsupportedOperationException
+ override def nullable: Boolean = true
+ override def foldable: Boolean = false
+ override def dataType: DataType = throw new UnsupportedOperationException
+}
+
+/**
+ * A Window specification reference that refers to the
[[WindowSpecDefinition]] defined
+ * under the name `name`.
+ */
+case class WindowSpecReference(name: String) extends WindowSpec
+
+/**
+ * The trait used to represent the type of a Window Frame.
+ */
+sealed trait FrameType
+
+/**
+ * RowFrame treats rows in a partition individually. When a
[[ValuePreceding]]
+ * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is
considered
+ * as a physical offset.
+ * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a
3-row frame,
+ * from the row precedes the current row to the row follows the current
row.
+ */
+case object RowFrame extends FrameType
+
+/**
+ * RangeFrame treats rows in a partition as groups of peers.
+ * All rows having the same `ORDER BY` ordering are considered as peers.
+ * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its
[[FrameBoundary]],
+ * the value is considered as a logical offset.
+ * For example, assuming the value of the current row's `ORDER BY`
expression `expr` is `v`,
+ * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame
containing rows whose values
+ * `expr` are in the range of [v-1, v+1].
+ *
+ * If `ORDER BY` clause is not defined, all rows in the partition is
considered as peers
+ * of the current row.
+ */
+case object RangeFrame extends FrameType
+
+/**
+ * The trait used to represent the type of a Window Frame Boundary.
+ */
+sealed trait FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean
+}
+
+/** UNBOUNDED PRECEDING boundary. */
+case object UnboundedPreceding extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => true
+ case vp: ValuePreceding => true
+ case CurrentRow => true
+ case vf: ValueFollowing => true
+ case UnboundedFollowing => true
+ }
+
+ override def toString: String = "UNBOUNDED PRECEDING"
+}
+
+/** <value> PRECEDING boundary. */
+case class ValuePreceding(value: Int) extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case ValuePreceding(anotherValue) => value >= anotherValue
+ case CurrentRow => true
+ case vf: ValueFollowing => true
+ case UnboundedFollowing => true
+ }
+
+ override def toString: String = s"$value PRECEDING"
+}
+
+/** CURRENT ROW boundary. */
+case object CurrentRow extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case vp: ValuePreceding => false
+ case CurrentRow => true
+ case vf: ValueFollowing => true
+ case UnboundedFollowing => true
+ }
+
+ override def toString: String = "CURRENT ROW"
+}
+
+/** <value> FOLLOWING boundary. */
+case class ValueFollowing(value: Int) extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case vp: ValuePreceding => false
+ case CurrentRow => false
+ case ValueFollowing(anotherValue) => value <= anotherValue
+ case UnboundedFollowing => true
+ }
+
+ override def toString: String = s"$value FOLLOWING"
+}
+
+/** UNBOUNDED FOLLOWING boundary. */
+case object UnboundedFollowing extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case vp: ValuePreceding => false
+ case CurrentRow => false
+ case vf: ValueFollowing => false
+ case UnboundedFollowing => true
+ }
+
+ override def toString: String = "UNBOUNDED FOLLOWING"
+}
+
+/**
+ * The trait used to represent the a Window Frame.
+ */
+sealed trait WindowFrame
+
+/** Used as a place holder when a frame specification is not defined. */
+case object UnspecifiedFrame extends WindowFrame
+
+/** A specified Window Frame. */
+case class SpecifiedWindowFrame(
+ frameType: FrameType,
+ frameStart: FrameBoundary,
+ frameEnd: FrameBoundary) extends WindowFrame {
+
+ /** If this WindowFrame is valid or not. */
+ def validate: Option[String] = (frameType, frameStart, frameEnd) match {
+ case (_, UnboundedFollowing, _) =>
+ Some(s"$UnboundedFollowing is not allowed as the start of a Window
Frame.")
+ case (_, _, UnboundedPreceding) =>
+ Some(s"$UnboundedPreceding is not allowed as the end of a Window
Frame.")
+ // case (RowFrame, start, end) => ??? RowFrame specific rule
+ // case (RangeFrame, start, end) => ??? RangeFrame specific rule
+ case (_, start, end) =>
+ if (start.notFollows(end)) {
+ None
+ } else {
+ val reason =
+ s"The end of this Window Frame $end is smaller than the start of
" +
+ s"this Window Frame $start."
+ Some(reason)
+ }
+ }
+
+ override def toString: String = frameType match {
+ case RowFrame => s"ROWS BETWEEN $frameStart AND $frameEnd"
+ case RangeFrame => s"RANGE BETWEEN $frameStart AND $frameEnd"
+ }
+}
+
+object SpecifiedWindowFrame {
+ /**
+ *
+ * @param hasOrderSpecification If the window spec has order by
expressions.
+ * @param acceptWindowFrame If the window function accepts
user-specified frame.
+ * @return
+ */
+ def defaultWindowFrame(
+ hasOrderSpecification: Boolean,
+ acceptWindowFrame: Boolean): SpecifiedWindowFrame = {
+ if (hasOrderSpecification && acceptWindowFrame) {
--- End diff --
There are window functions that do not support setting a window frame (e.g.
rank). So, for them, `acceptWindowFrame ` is false and the whole partition is
the frame.
For functions that do support setting a window frame, the default window
frame is `RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`. Please note that
at here, all rows considered as the peer row of the current row are included in
the frame. `ORDER BY` clause is used to determine if two row can be considered
as peer rows. For example, `ORDER BY c` means that if two rows have the same
value on column `c`, they are peer rows. So, without a `ORDER BY` clause, all
rows are considered as the peer row of the current row, which means that the
frame is effectively the entire partition.
Related references:
-
https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql
-
https://www.postgresql.org/docs/9.3/static/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS.
(you can search `default` and find the relevant parts)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]