Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9819#discussion_r47037900
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
    @@ -328,3 +281,222 @@ object FrameBoundaryExtractor {
         case _ => None
       }
     }
    +
    +/**
    + * A window function is a function that can only be evaluated in the 
context of a window operator.
    + */
    +trait WindowFunction extends Expression {
    +  /** Frame in which the window operator must be executed. */
    +  def frame: WindowFrame = UnspecifiedFrame
    +}
    +
    +abstract class OffsetWindowFunction
    +  extends Expression with WindowFunction with Unevaluable with 
ImplicitCastInputTypes {
    +  val input: Expression
    +  val default: Expression
    +  val offset: Expression
    +  val offsetSign: Int
    +
    +  override def children: Seq[Expression] = Seq(input, offset, default)
    +
    +  override def foldable: Boolean = input.foldable && (default == null || 
default.foldable)
    +
    +  override def nullable: Boolean = input.nullable && (default == null || 
default.nullable)
    +
    +  override lazy val frame = {
    +    // This will be triggered by the Analyzer.
    +    val offsetValue = offset.eval() match {
    +      case o: Int => o
    +      case x => throw new AnalysisException(
    +        s"Offset expression must be a foldable integer expression: $x")
    +    }
    +    val boundary = ValueFollowing(offsetSign * offsetValue)
    +    SpecifiedWindowFrame(RowFrame, boundary, boundary)
    +  }
    +
    +  override def dataType: DataType = input.dataType
    +
    +  override def inputTypes: Seq[AbstractDataType] =
    +    Seq(AnyDataType, IntegerType, TypeCollection(input.dataType, NullType))
    +
    +  override def toString: String = s"$prettyName($input, $offset, $default)"
    +}
    +
    +case class Lead(input: Expression, offset: Expression, default: Expression)
    +  extends OffsetWindowFunction {
    +
    +  def this(input: Expression, offset: Expression) =
    +    this(input, offset, Literal(null))
    +
    +  def this(input: Expression) =
    +    this(input, Literal(1), Literal(null))
    +
    +  def this() = this(Literal(null), Literal(1), Literal(null))
    +
    +  val offsetSign = 1
    +}
    +
    +case class Lag(input: Expression, offset: Expression, default: Expression)
    +  extends OffsetWindowFunction {
    +
    +  def this(input: Expression, offset: Expression) =
    +    this(input, offset, Literal(null))
    +
    +  def this(input: Expression) =
    +    this(input, Literal(1), Literal(null))
    +
    +  def this() = this(Literal(null), Literal(1), Literal(null))
    +
    +  val offsetSign = -1
    +}
    +
    +abstract class AggregateWindowFunction extends DeclarativeAggregate with 
WindowFunction {
    +  self: Product =>
    +  override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)
    +  override def dataType: DataType = IntegerType
    +  override def nullable: Boolean = false
    +  override val mergeExpressions = Nil // TODO how to deal with this?
    +}
    +
    +abstract class RowNumberLike extends AggregateWindowFunction {
    +  override def children: Seq[Expression] = Nil
    +  override def inputTypes: Seq[AbstractDataType] = Nil
    +  protected val zero = Literal(0)
    +  protected val one = Literal(1)
    +  protected val rowNumber = AttributeReference("rowNumber", IntegerType, 
false)()
    +  override val aggBufferAttributes: Seq[AttributeReference] = rowNumber :: 
Nil
    +  override val initialValues: Seq[Expression] = zero :: Nil
    +  override val updateExpressions: Seq[Expression] = Add(rowNumber, one) :: 
Nil
    +}
    +
    +/**
    + * A [[SizeBasedWindowFunction]] needs the size of the current window for 
its calculation.
    + */
    +trait SizeBasedWindowFunction extends AggregateWindowFunction {
    +  /** The partition size attribute, this is here to prevent us from 
creating an attribute on the
    +    * executor side. */
    +  val size = AttributeReference("size", IntegerType, false)()
    +
    +  /** Set the window size expression. */
    +  def withSize(n: Expression): SizeBasedWindowFunction
    +}
    +
    +case class RowNumber() extends RowNumberLike {
    +  override val evaluateExpression = Cast(rowNumber, IntegerType)
    +}
    +
    +case class CumeDist(n: Expression)
    +    extends RowNumberLike with SizeBasedWindowFunction {
    +  def this() = this(Literal(0))
    +  override def dataType: DataType = DoubleType
    +  override def withSize(n: Expression): CumeDist = CumeDist(n)
    +  override val frame = SpecifiedWindowFrame(RangeFrame, 
UnboundedPreceding, CurrentRow)
    +  override val evaluateExpression = Divide(Cast(rowNumber, DoubleType), 
Cast(n, DoubleType))
    +}
    +
    +case class NTile(buckets: Expression, n: Expression)
    +    extends RowNumberLike with SizeBasedWindowFunction {
    +  def this() = this(Literal(1), Literal(0))
    +  def this(buckets: Expression) = this(buckets, Literal(0))
    +
    +  // Validate buckets.
    +  buckets.eval() match {
    +    case b: Int if b > 0 => // Ok
    +    case x => throw new AnalysisException(
    +      "Buckets expression must be a foldable positive integer expression: 
$x")
    +  }
    +
    +  override def withSize(n: Expression): NTile = NTile(buckets, n)
    +  private val bucket = AttributeReference("bucket", IntegerType, false)()
    +  private val bucketThreshold = AttributeReference("bucketThreshold", 
IntegerType, false)()
    +  private val bucketSize = AttributeReference("bucketSize", IntegerType, 
false)()
    +  private val bucketsWithPadding = 
AttributeReference("bucketsWithPadding", IntegerType, false)()
    --- End diff --
    
    Both `bucketSize` and `bucketsWithPadding` are foldable, do we still need 
them in the buffer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to