Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9819#discussion_r45270377
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
    @@ -328,3 +280,208 @@ object FrameBoundaryExtractor {
         case _ => None
       }
     }
    +
    +/**
    +  * A window function is a function that can only be evaluated in the 
context of a window operator.
    +  */
    +trait WindowFunction extends Expression {
    +  /** Frame in which the window operator must be executed. */
    +  def frame: WindowFrame = UnspecifiedFrame
    +}
    +
    +abstract class OffsetWindowFunction
    +  extends Expression with WindowFunction with Unevaluable with 
ImplicitCastInputTypes {
    +  self: Product =>
    +  val input: Expression
    +  val default: Expression
    +  val offset: Expression
    +  val offsetSign: Int
    +  def offsetValue: Int = offset.eval().asInstanceOf[Int]
    +
    +  override def children: Seq[Expression] = Seq(input, offset, default)
    +
    +  override def foldable: Boolean = input.foldable && (default == null || 
default.foldable)
    +
    +  override def nullable: Boolean = input.nullable && (default == null || 
default.nullable)
    +
    +  override lazy val frame = {
    +    val boundary = ValueFollowing(offsetSign * offsetValue)
    +    SpecifiedWindowFrame(RowFrame, boundary, boundary)
    +  }
    +
    +  override def dataType: DataType = input.dataType
    +
    +  override def inputTypes: Seq[AbstractDataType] =
    +    Seq(AnyDataType, IntegerType, TypeCollection(input.dataType, NullType))
    +
    +  override def toString: String = s"$prettyName($input, $offset, $default)"
    +}
    +
    +case class Lead(input: Expression, offset: Expression, default: Expression)
    +  extends OffsetWindowFunction {
    +
    +  def this(input: Expression, offset: Expression) =
    +    this(input, offset, Literal(null))
    +
    +  def this(input: Expression) =
    +    this(input, Literal(1), Literal(null))
    +
    +  def this() = this(Literal(null), Literal(1), Literal(null))
    +
    +  val offsetSign = 1
    +}
    +
    +case class Lag(input: Expression, offset: Expression, default: Expression)
    +  extends OffsetWindowFunction {
    +
    +  def this(input: Expression, offset: Expression) =
    +    this(input, offset, Literal(null))
    +
    +  def this(input: Expression) =
    +    this(input, Literal(1), Literal(null))
    +
    +  def this() = this(Literal(null), Literal(1), Literal(null))
    +
    +  val offsetSign = -1
    +}
    +
    +abstract class AggregateWindowFunction extends DeclarativeAggregate with 
WindowFunction {
    +  self: Product =>
    +  override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)
    +  override def dataType: DataType = IntegerType
    +  override def nullable: Boolean = false
    +  override val mergeExpressions = Nil // TODO how to deal with this?
    +}
    +
    +abstract class RowNumberLike extends AggregateWindowFunction {
    +  override def children: Seq[Expression] = Nil
    +  override def inputTypes: Seq[AbstractDataType] = Nil
    +  protected val zero = Literal(0)
    +  protected val one = Literal(1)
    +  protected val rowNumber = AttributeReference("rowNumber", IntegerType, 
false)()
    +  override val aggBufferAttributes: Seq[AttributeReference] = rowNumber :: 
Nil
    +  override val initialValues: Seq[Expression] = zero :: Nil
    +  override val updateExpressions: Seq[Expression] = Add(rowNumber, one) :: 
Nil
    +}
    +
    +/**
    +  * A [[SizeBasedWindowFunction]] needs the size of the current window for 
its calculation.
    +  */
    +trait SizeBasedWindowFunction extends AggregateWindowFunction {
    +  /** Set the window size expression. */
    +  def withSize(n: Expression): SizeBasedWindowFunction
    +}
    +
    +case class RowNumber() extends RowNumberLike {
    +  override val evaluateExpression = Cast(rowNumber, IntegerType)
    +}
    +
    +case class CumeDist(n: Expression)
    +    extends RowNumberLike with SizeBasedWindowFunction {
    +  def this() = this(Literal(0))
    +  override def dataType: DataType = DoubleType
    +  override def withSize(n: Expression): CumeDist = CumeDist(n)
    +  override val frame = SpecifiedWindowFrame(RangeFrame, 
UnboundedPreceding, CurrentRow)
    +  override val evaluateExpression = Divide(Cast(rowNumber, DoubleType), 
Cast(n, DoubleType))
    +}
    +
    +// TODO check if the updates are correct! This used to be the case!
    --- End diff --
    
    This should work. I'll remove the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to