Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18540#discussion_r130215469
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
    @@ -106,161 +101,171 @@ case class WindowSpecReference(name: String) 
extends WindowSpec
     /**
      * The trait used to represent the type of a Window Frame.
      */
    -sealed trait FrameType
    +sealed trait FrameType {
    +  def inputType: AbstractDataType
    +  def sql: String
    +}
     
     /**
    - * RowFrame treats rows in a partition individually. When a 
[[ValuePreceding]]
    - * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is 
considered
    - * as a physical offset.
    + * RowFrame treats rows in a partition individually. Values used in a row 
frame are considered
    + * to be physical offsets.
      * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 
3-row frame,
      * from the row that precedes the current row to the row that follows the 
current row.
      */
    -case object RowFrame extends FrameType
    +case object RowFrame extends FrameType {
    +  override def inputType: AbstractDataType = IntegerType
    +  override def sql: String = "ROWS"
    +}
     
     /**
    - * RangeFrame treats rows in a partition as groups of peers.
    - * All rows having the same `ORDER BY` ordering are considered as peers.
    - * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its 
[[FrameBoundary]],
    - * the value is considered as a logical offset.
    + * RangeFrame treats rows in a partition as groups of peers. All rows 
having the same `ORDER BY`
    + * ordering are considered as peers. Values used in a range frame are 
considered to be logical
    + * offsets.
      * For example, assuming the value of the current row's `ORDER BY` 
expression `expr` is `v`,
      * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame 
containing rows whose values
      * `expr` are in the range of [v-1, v+1].
      *
      * If `ORDER BY` clause is not defined, all rows in the partition are 
considered as peers
      * of the current row.
      */
    -case object RangeFrame extends FrameType
    -
    -/**
    - * The trait used to represent the type of a Window Frame Boundary.
    - */
    -sealed trait FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean
    +case object RangeFrame extends FrameType {
    +  override def inputType: AbstractDataType = 
TypeCollection.NumericAndInterval
    +  override def sql: String = "RANGE"
     }
     
     /**
    - * Extractor for making working with frame boundaries easier.
    + * The trait used to represent special boundaries used in a window frame.
      */
    -object FrameBoundary {
    -  def apply(boundary: FrameBoundary): Option[Int] = unapply(boundary)
    -  def unapply(boundary: FrameBoundary): Option[Int] = boundary match {
    -    case CurrentRow => Some(0)
    -    case ValuePreceding(offset) => Some(-offset)
    -    case ValueFollowing(offset) => Some(offset)
    -    case _ => None
    -  }
    +sealed trait SpecialFrameBoundary extends Expression with Unevaluable {
    +  override def children: Seq[Expression] = Nil
    +  override def dataType: DataType = NullType
    +  override def foldable: Boolean = false
    +  override def nullable: Boolean = false
     }
     
    -/** UNBOUNDED PRECEDING boundary. */
    -case object UnboundedPreceding extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => true
    -    case vp: ValuePreceding => true
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    -
    -  override def toString: String = "UNBOUNDED PRECEDING"
    +/** UNBOUNDED boundary. */
    +case object UnboundedPreceding extends SpecialFrameBoundary {
    +  override def sql: String = "UNBOUNDED PRECEDING"
     }
     
    -/** <value> PRECEDING boundary. */
    -case class ValuePreceding(value: Int) extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case ValuePreceding(anotherValue) => value >= anotherValue
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    -
    -  override def toString: String = s"$value PRECEDING"
    +case object UnboundedFollowing extends SpecialFrameBoundary {
    +  override def sql: String = "UNBOUNDED FOLLOWING"
     }
     
     /** CURRENT ROW boundary. */
    -case object CurrentRow extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => true
    -    case vf: ValueFollowing => true
    -    case UnboundedFollowing => true
    -  }
    -
    -  override def toString: String = "CURRENT ROW"
    -}
    -
    -/** <value> FOLLOWING boundary. */
    -case class ValueFollowing(value: Int) extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => false
    -    case ValueFollowing(anotherValue) => value <= anotherValue
    -    case UnboundedFollowing => true
    -  }
    -
    -  override def toString: String = s"$value FOLLOWING"
    -}
    -
    -/** UNBOUNDED FOLLOWING boundary. */
    -case object UnboundedFollowing extends FrameBoundary {
    -  def notFollows(other: FrameBoundary): Boolean = other match {
    -    case UnboundedPreceding => false
    -    case vp: ValuePreceding => false
    -    case CurrentRow => false
    -    case vf: ValueFollowing => false
    -    case UnboundedFollowing => true
    -  }
    -
    -  override def toString: String = "UNBOUNDED FOLLOWING"
    +case object CurrentRow extends SpecialFrameBoundary {
    +  override def sql: String = "CURRENT ROW"
     }
     
     /**
      * Represents a window frame.
      */
    -sealed trait WindowFrame
    +sealed trait WindowFrame extends Expression with Unevaluable {
    +  override def children: Seq[Expression] = Nil
    +  override def dataType: DataType = throw new 
UnsupportedOperationException("dataType")
    +  override def foldable: Boolean = false
    +  override def nullable: Boolean = false
    +}
     
     /** Used as a placeholder when a frame specification is not defined. */
     case object UnspecifiedFrame extends WindowFrame
     
    -/** A specified Window Frame. */
    +/**
    + * A specified Window Frame. The val lower/uppper can be either a foldable 
[[Expression]] or a
    + * [[SpecialFrameBoundary]].
    + */
     case class SpecifiedWindowFrame(
         frameType: FrameType,
    -    frameStart: FrameBoundary,
    -    frameEnd: FrameBoundary) extends WindowFrame {
    -
    -  /** If this WindowFrame is valid or not. */
    -  def validate: Option[String] = (frameType, frameStart, frameEnd) match {
    -    case (_, UnboundedFollowing, _) =>
    -      Some(s"$UnboundedFollowing is not allowed as the start of a Window 
Frame.")
    -    case (_, _, UnboundedPreceding) =>
    -      Some(s"$UnboundedPreceding is not allowed as the end of a Window 
Frame.")
    -    // case (RowFrame, start, end) => ??? RowFrame specific rule
    -    // case (RangeFrame, start, end) => ??? RangeFrame specific rule
    -    case (_, start, end) =>
    -      if (start.notFollows(end)) {
    -        None
    -      } else {
    -        val reason =
    -          s"The end of this Window Frame $end is smaller than the start of 
" +
    -          s"this Window Frame $start."
    -        Some(reason)
    -      }
    +    lower: Expression,
    +    upper: Expression)
    +  extends WindowFrame {
    +
    +  override def children: Seq[Expression] = lower :: upper :: Nil
    +
    +  lazy val valueBoundary: Seq[Expression] =
    +    children.filterNot(_.isInstanceOf[SpecialFrameBoundary])
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    // Check lower value.
    +    val lowerCheck = checkBoundary(lower, "lower")
    +    if (lowerCheck.isFailure) {
    +      return lowerCheck
    +    }
    +
    +    // Check upper value.
    +    val upperCheck = checkBoundary(upper, "upper")
    +    if (upperCheck.isFailure) {
    +      return upperCheck
    +    }
    +
    +    // Check combination (of expressions).
    +    (lower, upper) match {
    +      case (l: Expression, u: Expression) if !isValidFrameBoundary(l, u) =>
    +        TypeCheckFailure(s"Window frame upper bound '$upper' does not 
followes the lower bound " +
    +          s"'$lower'.")
    +      case (l: SpecialFrameBoundary, _) => TypeCheckSuccess
    +      case (_, u: SpecialFrameBoundary) => TypeCheckSuccess
    +      case (l: Expression, u: Expression) if l.dataType != u.dataType =>
    +        TypeCheckFailure(
    +          s"Window frame bounds '$lower' and '$upper' do no not have the 
same data type: " +
    +            s"'${l.dataType.catalogString}' <> 
'${u.dataType.catalogString}'")
    +      case (l: Expression, u: Expression) if isGreaterThan(l, u) =>
    +        TypeCheckFailure(
    +          "The lower bound of a window frame must less than or equal to 
the upper bound")
    --- End diff --
    
    Another question. Do we have test cases for the above three negative cases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to