cloud-fan commented on a change in pull request #29604:
URL: https://github.com/apache/spark/pull/29604#discussion_r488719491



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the

Review comment:
       nit: we can remove it as it just repeats the usage doc.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in

Review comment:
       `offset - a positive int literal to indicate the offset in the window 
frame. It starts with 1.`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: 
Boolean)

Review comment:
       BTW, which window frames does `OffsetWindowFunction` support?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: 
Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero 
but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is 
${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)
+  )
+
+  override lazy val updateExpressions: Seq[Expression] = {
+    if (ignoreNulls) {
+      Seq(
+        /* result = */ If(valueSet || input.isNull || count < offset, result, 
input),
+        /* count = */ If(input.isNull, count, count + 1L),
+        /* valueSet = */ valueSet || (input.isNotNull && count >= offset)
+      )
+    } else {
+      Seq(
+        /* result = */ If(valueSet || count < offset, result, input),
+        /* count = */ count + 1L,
+        /* valueSet = */ valueSet || count >= offset

Review comment:
       and `count` should start from 0, so that we at least update `result` 
once.

##########
File path: sql/core/src/test/resources/sql-tests/inputs/window.sql
##########
@@ -124,4 +144,26 @@ WINDOW w AS (PARTITION BY cate ORDER BY val);
 -- with filter predicate
 SELECT val, cate,
 count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate)
-FROM testData ORDER BY cate, val;
\ No newline at end of file
+FROM testData ORDER BY cate, val;
+
+-- nth_value() over ()
+SELECT
+    employee_name,
+    salary,
+    nth_value(employee_name, 2) OVER (ORDER BY salary DESC) 
second_highest_salary
+FROM
+    basic_pays
+ORDER BY salary DESC;
+
+SELECT
+       employee_name,
+       department,
+       salary,
+       NTH_VALUE(employee_name, 2) OVER  (
+               PARTITION BY department
+               ORDER BY salary DESC
+               RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING

Review comment:
       can we test more different frame boundaries?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null

Review comment:
       This doesn't match the usage doc: ```If the value of `input` at the 
`offset`th row is null, null is returned```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: 
Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero 
but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is 
${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)

Review comment:
       I don't think it worths an extra boolean slot just to save the 
calculation of `count >= offset`.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: 
Boolean)

Review comment:
       can we add a TODO to optimize it using `OffsetWindowFunction`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with 
SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row 
from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the 
`offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is 
the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of 
`input` at the
+      `offset`th row is null, null is returned. If there is no such an offset 
row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to 
the first row in
+          the window for which to return the expression. The offset can be a 
constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue 
should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: 
Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero 
but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is 
${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)
+  )
+
+  override lazy val updateExpressions: Seq[Expression] = {
+    if (ignoreNulls) {
+      Seq(
+        /* result = */ If(valueSet || input.isNull || count < offset, result, 
input),
+        /* count = */ If(input.isNull, count, count + 1L),
+        /* valueSet = */ valueSet || (input.isNotNull && count >= offset)
+      )
+    } else {
+      Seq(
+        /* result = */ If(valueSet || count < offset, result, input),
+        /* count = */ count + 1L,
+        /* valueSet = */ valueSet || count >= offset

Review comment:
       I'd expect something like
   ```
   Seq(
     /* result = */ If(count < offset, input, result),
     /* count = */ count + 1L
   )
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to