beliefer commented on a change in pull request #29604:
URL: https://github.com/apache/spark/pull/29604#discussion_r489187186



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: 
Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero 
but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is 
${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)
+  )
+
+  override lazy val updateExpressions: Seq[Expression] = {
+    if (ignoreNulls) {
+      Seq(
+        /* result = */ If(valueSet || input.isNull || count < offset, result, 
input),
+        /* count = */ If(input.isNull, count, count + 1L),
+        /* valueSet = */ valueSet || (input.isNotNull && count >= offset)
+      )
+    } else {
+      Seq(
+        /* result = */ If(valueSet || count < offset, result, input),
+        /* count = */ count + 1L,
+        /* valueSet = */ valueSet || count >= offset

Review comment:
       We use
   ```
    /* result = */ If(count === offset, input, result),
   /* count = */ count + 1L
   ``` now.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: 
Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero 
but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is 
${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)
+  )
+
+  override lazy val updateExpressions: Seq[Expression] = {
+    if (ignoreNulls) {
+      Seq(
+        /* result = */ If(valueSet || input.isNull || count < offset, result, 
input),
+        /* count = */ If(input.isNull, count, count + 1L),
+        /* valueSet = */ valueSet || (input.isNotNull && count >= offset)
+      )
+    } else {
+      Seq(
+        /* result = */ If(valueSet || count < offset, result, input),
+        /* count = */ count + 1L,
+        /* valueSet = */ valueSet || count >= offset

Review comment:
       We use
   ```
    /* result = */ If(count === offset, input, result),
   /* count = */ count + 1L
   ``` 
   now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to