Repository: spark
Updated Branches:
  refs/heads/master 6edfff055 -> 031910b0e


[SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API should allow literal 
boundary

## What changes were proposed in this pull request?

Window rangeBetween() API should allow literal boundary, that means, the window 
range frame can calculate frame of double/date/timestamp.

Example of the use case can be:
```
SELECT
        val_timestamp,
        cate,
        avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp RANGE 
BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING)
FROM testData
```

This PR refactors the Window `rangeBetween` and `rowsBetween` API, while the 
legacy user code should still be valid.

## How was this patch tested?

Add new test cases both in `DataFrameWindowFunctionsSuite` and in `window.sql`.

Author: Xingbo Jiang <xingbo.ji...@databricks.com>

Closes #18814 from jiangxb1987/literal-boundary.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/031910b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/031910b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/031910b0

Branch: refs/heads/master
Commit: 031910b0ec24526d044fd31c05430dcda42b5be3
Parents: 6edfff0
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Wed Aug 9 13:23:49 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Aug 9 13:23:49 2017 +0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/TypeCoercion.scala    |   9 +-
 .../expressions/windowExpressions.scala         |   8 +-
 .../spark/sql/execution/window/WindowExec.scala |   9 +-
 .../apache/spark/sql/expressions/Window.scala   |  63 ++++++++-
 .../spark/sql/expressions/WindowSpec.scala      |  69 +++++++--
 .../scala/org/apache/spark/sql/functions.scala  |  27 ++++
 .../test/resources/sql-tests/inputs/window.sql  |  22 ++-
 .../resources/sql-tests/results/window.sql.out  | 141 +++++++++++++------
 .../sql/DataFrameWindowFunctionsSuite.scala     |  53 ++++++-
 9 files changed, 335 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 25af014..06d8350 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -821,9 +821,12 @@ object TypeCoercion {
     }
 
     private def createBoundaryCast(boundary: Expression, dt: DataType): 
Expression = {
-      boundary match {
-        case e: SpecialFrameBoundary => e
-        case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) 
=> Cast(e, dt)
+      (boundary, dt) match {
+        case (e: SpecialFrameBoundary, _) => e
+        case (e, _: DateType) => e
+        case (e, _: TimestampType) => e
+        case (e: Expression, t) if e.dataType != t && Cast.canCast(e.dataType, 
t) =>
+          Cast(e, t)
         case _ => boundary
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index a829dcc..e11e3a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -89,7 +89,11 @@ case class WindowSpecDefinition(
     elements.mkString("(", " ", ")")
   }
 
-  private def isValidFrameType(ft: DataType): Boolean = 
orderSpec.head.dataType == ft
+  private def isValidFrameType(ft: DataType): Boolean = 
(orderSpec.head.dataType, ft) match {
+    case (DateType, IntegerType) => true
+    case (TimestampType, CalendarIntervalType) => true
+    case (a, b) => a == b
+  }
 }
 
 /**
@@ -129,7 +133,7 @@ case object RowFrame extends FrameType {
  * of the current row.
  */
 case object RangeFrame extends FrameType {
-  override def inputType: AbstractDataType = NumericType
+  override def inputType: AbstractDataType = TypeCollection.NumericAndInterval
   override def sql: String = "RANGE"
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 0766e37..f8bb667 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
 
 /**
  * This class calculates and outputs (windowed) aggregates over the rows in a 
single (sorted)
@@ -139,7 +141,12 @@ case class WindowExec(
         }
 
         // Create the projection which returns the current 'value' modified by 
adding the offset.
-        val boundExpr = Add(expr, Cast(boundOffset, expr.dataType))
+        val boundExpr = (expr.dataType, boundOffset.dataType) match {
+          case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+          case (TimestampType, CalendarIntervalType) =>
+            TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
+          case (a, b) if a== b => Add(expr, boundOffset)
+        }
         val bound = newMutableProjection(boundExpr :: Nil, child.output)
 
         // Construct the ordering. This is used to compare the result of 
current value projection

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
index cd79128..1caa243 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
@@ -75,7 +75,7 @@ object Window {
   }
 
   /**
-   * Value representing the last row in the partition, equivalent to 
"UNBOUNDED PRECEDING" in SQL.
+   * Value representing the first row in the partition, equivalent to 
"UNBOUNDED PRECEDING" in SQL.
    * This can be used to specify the frame boundaries:
    *
    * {{{
@@ -167,17 +167,17 @@ object Window {
    * current row.
    *
    * We recommend users use `Window.unboundedPreceding`, 
`Window.unboundedFollowing`,
-   * and `Window.currentRow` to specify special boundary values, rather than 
using integral
-   * values directly.
+   * and `Window.currentRow` to specify special boundary values, rather than 
using long values
+   * directly.
    *
    * A range-based boundary is based on the actual value of the ORDER BY
    * expression(s). An offset is used to alter the value of the ORDER BY 
expression, for
    * instance if the current order by expression has a value of 10 and the 
lower bound offset
    * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. 
This however puts a
    * number of constraints on the ORDER BY expressions: there can be only one 
expression and this
-   * expression must have a numerical data type. An exception can be made when 
the offset is 0,
-   * because no value modification is needed, in this case multiple and 
non-numeric ORDER BY
-   * expression are allowed.
+   * expression must have a numerical data type. An exception can be made when 
the offset is
+   * unbounded, because no value modification is needed, in this case multiple 
and non-numeric
+   * ORDER BY expression are allowed.
    *
    * {{{
    *   import org.apache.spark.sql.expressions.Window
@@ -210,6 +210,57 @@ object Window {
     spec.rangeBetween(start, end)
   }
 
+  /**
+   * Creates a [[WindowSpec]] with the frame boundaries defined,
+   * from `start` (inclusive) to `end` (inclusive).
+   *
+   * Both `start` and `end` are relative to the current row. For example, 
"lit(0)" means
+   * "current row", while "lit(-1)" means one off before the current row, and 
"lit(5)" means the
+   * five off after the current row.
+   *
+   * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and 
`currentRow()` from
+   * [[org.apache.spark.sql.functions]] to specify special boundary values, 
literals are not
+   * transformed to 
[[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
+   *
+   * A range-based boundary is based on the actual value of the ORDER BY
+   * expression(s). An offset is used to alter the value of the ORDER BY 
expression, for
+   * instance if the current order by expression has a value of 10 and the 
lower bound offset
+   * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. 
This however puts a
+   * number of constraints on the ORDER BY expressions: there can be only one 
expression and this
+   * expression must have a numerical/date/timestamp data type. An exception 
can be made when the
+   * offset is unbounded, because no value modification is needed, in this 
case multiple and
+   * non-numerical/date/timestamp data type ORDER BY expression are allowed.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), 
lit(1))
+   *   df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  4|
+   *   |  1|       a|  4|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if the 
expression is
+   *              
[[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
+   * @param end boundary end, inclusive. The frame is unbounded if the 
expression is
+   *            
[[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
+   * @since 2.3.0
+   */
+  def rangeBetween(start: Column, end: Column): WindowSpec = {
+    spec.rangeBetween(start, end)
+  }
+
   private[sql] def spec: WindowSpec = {
     new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
index f8b404d..4c41aa3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -146,22 +146,22 @@ class WindowSpec private[sql](
   /**
    * Defines the frame boundaries, from `start` (inclusive) to `end` 
(inclusive).
    *
-   * Both `start` and `end` are relative from the current row. For example, 
"0" means "current row",
-   * while "-1" means one off before the current row, and "5" means the five 
off after the
-   * current row.
+   * Both `start` and `end` are relative from the current row. For example, 
"0" means
+   * "current row", while "-1" means one off before the current row, and "5" 
means the five off
+   * after the current row.
    *
    * We recommend users use `Window.unboundedPreceding`, 
`Window.unboundedFollowing`,
-   * and `Window.currentRow` to specify special boundary values, rather than 
using integral
-   * values directly.
+   * and `Window.currentRow` to specify special boundary values, rather than 
using long values
+   * directly.
    *
    * A range-based boundary is based on the actual value of the ORDER BY
    * expression(s). An offset is used to alter the value of the ORDER BY 
expression, for
    * instance if the current order by expression has a value of 10 and the 
lower bound offset
    * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. 
This however puts a
    * number of constraints on the ORDER BY expressions: there can be only one 
expression and this
-   * expression must have a numerical data type. An exception can be made when 
the offset is 0,
-   * because no value modification is needed, in this case multiple and 
non-numeric ORDER BY
-   * expression are allowed.
+   * expression must have a numerical data type. An exception can be made when 
the offset is
+   * unbounded, because no value modification is needed, in this case multiple 
and non-numeric
+   * ORDER BY expression are allowed.
    *
    * {{{
    *   import org.apache.spark.sql.expressions.Window
@@ -210,6 +210,59 @@ class WindowSpec private[sql](
   }
 
   /**
+   * Defines the frame boundaries, from `start` (inclusive) to `end` 
(inclusive).
+   *
+   * Both `start` and `end` are relative to the current row. For example, 
"lit(0)" means
+   * "current row", while "lit(-1)" means one off before the current row, and 
"lit(5)" means the
+   * five off after the current row.
+   *
+   * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and 
`currentRow()` from
+   * [[org.apache.spark.sql.functions]] to specify special boundary values, 
literals are not
+   * transformed to 
[[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
+   *
+   * A range-based boundary is based on the actual value of the ORDER BY
+   * expression(s). An offset is used to alter the value of the ORDER BY 
expression, for
+   * instance if the current order by expression has a value of 10 and the 
lower bound offset
+   * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. 
This however puts a
+   * number of constraints on the ORDER BY expressions: there can be only one 
expression and this
+   * expression must have a numerical/date/timestamp data type. An exception 
can be made when the
+   * offset is unbounded, because no value modification is needed, in this 
case multiple and
+   * non-numerical/date/timestamp data type ORDER BY expression are allowed.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), 
lit(1))
+   *   df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  4|
+   *   |  1|       a|  4|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if the 
expression is
+   *              
[[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
+   * @param end boundary end, inclusive. The frame is unbounded if the 
expression is
+   *            
[[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
+   * @since 2.3.0
+   */
+  def rangeBetween(start: Column, end: Column): WindowSpec = {
+    new WindowSpec(
+      partitionSpec,
+      orderSpec,
+      SpecifiedWindowFrame(RangeFrame, start.expr, end.expr))
+  }
+
+  /**
    * Converts this [[WindowSpec]] into a [[Column]] with an aggregate 
expression.
    */
   private[sql] def withAggregate(aggregate: Column): Column = {

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 496619a..14ab8a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint}
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -777,6 +778,32 @@ object functions {
   
//////////////////////////////////////////////////////////////////////////////////////////////
   // Window functions
   
//////////////////////////////////////////////////////////////////////////////////////////////
+  /**
+   * Window function: returns the special frame boundary that represents the 
first row in the
+   * window partition.
+   *
+   * @group window_funcs
+   * @since 2.3.0
+   */
+  def unboundedPreceding(): Column = Column(UnboundedPreceding)
+
+  /**
+   * Window function: returns the special frame boundary that represents the 
last row in the
+   * window partition.
+   *
+   * @group window_funcs
+   * @since 2.3.0
+   */
+  def unboundedFollowing(): Column = Column(UnboundedFollowing)
+
+  /**
+   * Window function: returns the special frame boundary that represents the 
current row in the
+   * window partition.
+   *
+   * @group window_funcs
+   * @since 2.3.0
+   */
+  def currentRow(): Column = Column(CurrentRow)
 
   /**
    * Window function: returns the cumulative distribution of values within a 
window partition,

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/test/resources/sql-tests/inputs/window.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql 
b/sql/core/src/test/resources/sql-tests/inputs/window.sql
index 342e571..c4bea34 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/window.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql
@@ -1,8 +1,15 @@
 -- Test data.
 CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, 
"b"), (2, 3L, "b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate);
+(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
+(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
+(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
+(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
+(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
+(null, null, null, null, null, null),
+(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
+AS testData(val, val_long, val_double, val_date, val_timestamp, cate);
 
 -- RowsBetween
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT 
ROW) FROM testData
@@ -19,6 +26,13 @@ SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY 
val
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
 SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
 RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY 
cate, val_long;
+SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY 
val_double
+RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, 
val_double;
+SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date
+RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, 
val_date;
+SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY 
val_timestamp
+RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
+ORDER BY cate, val_timestamp;
 
 -- RangeBetween with reverse OrderBy
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
@@ -31,7 +45,7 @@ SELECT val, cate, count(val) OVER(PARTITION BY cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
+SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val;

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/test/resources/sql-tests/results/window.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out 
b/sql/core/src/test/resources/sql-tests/results/window.sql.out
index 9751106..73ad27e 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out
@@ -1,12 +1,19 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 19
+-- Number of queries: 22
 
 
 -- !query 0
 CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, 
"b"), (2, 3L, "b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate)
+(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
+(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
+(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
+(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
+(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
+(null, null, null, null, null, null),
+(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
+AS testData(val, val_long, val_double, val_date, val_timestamp, cate)
 -- !query 0 schema
 struct<>
 -- !query 0 output
@@ -109,11 +116,63 @@ NULL      b       NULL
 
 
 -- !query 7
+SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY 
val_double
+RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, 
val_double
+-- !query 7 schema
+struct<val_double:double,cate:string,sum(val_double) OVER (PARTITION BY cate 
ORDER BY val_double ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2.5 AS 
DOUBLE) FOLLOWING):double>
+-- !query 7 output
+NULL   NULL    NULL
+1.0    NULL    1.0
+1.0    a       4.5
+1.0    a       4.5
+2.5    a       2.5
+100.001        a       100.001
+1.0    b       4.3
+3.3    b       3.3
+100.001        b       100.001
+
+
+-- !query 8
+SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date
+RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, 
val_date
+-- !query 8 schema
+struct<val_date:date,cate:string,max(val_date) OVER (PARTITION BY cate ORDER 
BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING):date>
+-- !query 8 output
+NULL   NULL    NULL
+2017-08-01     NULL    2017-08-01
+2017-08-01     a       2017-08-02
+2017-08-01     a       2017-08-02
+2017-08-02     a       2017-08-02
+2020-12-31     a       2020-12-31
+2017-08-01     b       2017-08-03
+2017-08-03     b       2017-08-03
+2020-12-31     b       2020-12-31
+
+
+-- !query 9
+SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY 
val_timestamp
+RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
+ORDER BY cate, val_timestamp
+-- !query 9 schema
+struct<val_timestamp:timestamp,cate:string,avg(CAST(val_timestamp AS DOUBLE)) 
OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN 
CURRENT ROW AND interval 3 weeks 2 days 4 hours FOLLOWING):double>
+-- !query 9 output
+NULL   NULL    NULL
+2017-07-31 17:00:00    NULL    1.5015456E9
+2017-07-31 17:00:00    a       1.5016970666666667E9
+2017-07-31 17:00:00    a       1.5016970666666667E9
+2017-08-05 23:13:20    a       1.502E9
+2020-12-30 16:00:00    a       1.6093728E9
+2017-07-31 17:00:00    b       1.5022728E9
+2017-08-17 13:00:00    b       1.503E9
+2020-12-30 16:00:00    b       1.6093728E9
+
+
+-- !query 10
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 7 schema
+-- !query 10 schema
 struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC 
NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
--- !query 7 output
+-- !query 10 output
 NULL   NULL    NULL
 3      NULL    3
 NULL   a       NULL
@@ -125,62 +184,62 @@ NULL      a       NULL
 3      b       5
 
 
--- !query 8
+-- !query 11
 SELECT val, cate, count(val) OVER(PARTITION BY cate
 ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, 
val
--- !query 8 schema
+-- !query 11 schema
 struct<>
--- !query 8 output
+-- !query 11 output
 org.apache.spark.sql.AnalysisException
 cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data 
type mismatch: Window frame upper bound '1' does not followes the lower bound 
'unboundedfollowing$()'.; line 1 pos 33
 
 
--- !query 9
+-- !query 12
 SELECT val, cate, count(val) OVER(PARTITION BY cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 9 schema
+-- !query 12 schema
 struct<>
--- !query 9 output
+-- !query 12 output
 org.apache.spark.sql.AnalysisException
 cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 
FOLLOWING)' due to data type mismatch: A range window frame cannot be used in 
an unordered window specification.; line 1 pos 33
 
 
--- !query 10
+-- !query 13
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 10 schema
+-- !query 13 schema
 struct<>
--- !query 10 output
+-- !query 13 output
 org.apache.spark.sql.AnalysisException
 cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC 
NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 
FOLLOWING)' due to data type mismatch: A range window frame with value 
boundaries cannot be used in a window specification with multiple order by 
expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33
 
 
--- !query 11
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
+-- !query 14
+SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 11 schema
+-- !query 14 schema
 struct<>
--- !query 11 output
+-- !query 14 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_date() ASC 
NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type 
mismatch: The data type 'DateType' used in the order specification does not 
match the data type 'IntegerType' which is used in the range frame.; line 1 pos 
33
+cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_timestamp() ASC 
NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type 
mismatch: The data type 'TimestampType' used in the order specification does 
not match the data type 'IntegerType' which is used in the range frame.; line 1 
pos 33
 
 
--- !query 12
+-- !query 15
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val
--- !query 12 schema
+-- !query 15 schema
 struct<>
--- !query 12 output
+-- !query 15 output
 org.apache.spark.sql.AnalysisException
 cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type 
mismatch: The lower bound of a window frame must be less than or equal to the 
upper bound; line 1 pos 33
 
 
--- !query 13
+-- !query 16
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY 
cate, val
--- !query 13 schema
+-- !query 16 schema
 struct<>
--- !query 13 output
+-- !query 16 output
 org.apache.spark.sql.catalyst.parser.ParseException
 
 Frame bound value must be a literal.(line 2, pos 30)
@@ -191,7 +250,7 @@ RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM 
testData ORDER BY cat
 ------------------------------^^^
 
 
--- !query 14
+-- !query 17
 SELECT val, cate,
 max(val) OVER w AS max,
 min(val) OVER w AS min,
@@ -218,9 +277,9 @@ approx_count_distinct(val) OVER w AS approx_count_distinct
 FROM testData
 WINDOW w AS (PARTITION BY cate ORDER BY val)
 ORDER BY cate, val
--- !query 14 schema
+-- !query 17 schema
 
struct<val:int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint>
--- !query 14 output
+-- !query 17 output
 NULL   NULL    NULL    NULL    NULL    0       NULL    NULL    NULL    NULL    
NULL    NULL    NULL    NULL    NULL    1       1       0.5     0.0     1       
1       NULL    NULL    0
 3      NULL    3       3       3       1       3       3.0     NaN     NULL    
3       NULL    3       3       3       2       2       1.0     1.0     2       
2       0.0     NaN     1
 NULL   a       NULL    NULL    NULL    0       NULL    NULL    NULL    NULL    
NULL    NULL    NULL    NULL    NULL    1       1       0.25    0.0     1       
1       NULL    NULL    0
@@ -232,11 +291,11 @@ NULL      a       NULL    NULL    NULL    0       NULL    
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL    1       1       
0.25    0.
 3      b       3       1       1       3       6       2.0     1.0     1       
1       1       3       3       3       3       3       1.0     1.0     2       
3       0.6666666666666666      1.0     3
 
 
--- !query 15
+-- !query 18
 SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData 
ORDER BY cate, val
--- !query 15 schema
+-- !query 18 schema
 struct<val:int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate 
ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW):double>
--- !query 15 output
+-- !query 18 output
 NULL   NULL    NULL
 3      NULL    NULL
 NULL   a       NULL
@@ -248,20 +307,20 @@ NULL      a       NULL
 3      b       NULL
 
 
--- !query 16
+-- !query 19
 SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY 
cate, val
--- !query 16 schema
+-- !query 19 schema
 struct<>
--- !query 16 output
+-- !query 19 output
 org.apache.spark.sql.AnalysisException
 Window function row_number() requires window to be ordered, please add ORDER 
BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY 
window_partition ORDER BY window_ordering) from table;
 
 
--- !query 17
+-- !query 20
 SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY 
cate, val
--- !query 17 schema
+-- !query 20 schema
 struct<val:int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) 
OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
--- !query 17 output
+-- !query 20 output
 NULL   NULL    13      1.8571428571428572
 3      NULL    13      1.8571428571428572
 NULL   a       13      1.8571428571428572
@@ -273,7 +332,7 @@ NULL        a       13      1.8571428571428572
 3      b       13      1.8571428571428572
 
 
--- !query 18
+-- !query 21
 SELECT val, cate,
 first_value(false) OVER w AS first_value,
 first_value(true, true) OVER w AS first_value_ignore_null,
@@ -284,9 +343,9 @@ last_value(false, false) OVER w AS last_value_contain_null
 FROM testData
 WINDOW w AS ()
 ORDER BY cate, val
--- !query 18 schema
+-- !query 21 schema
 
struct<val:int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean>
--- !query 18 output
+-- !query 21 output
 NULL   NULL    false   true    false   false   true    false
 3      NULL    false   true    false   false   true    false
 NULL   a       false   true    false   false   true    false

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9806e57..ea725af 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql
 
+import java.sql.{Date, Timestamp}
+
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction, Window}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
 
 /**
  * Window function testing for DataFrame API.
@@ -172,7 +175,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest with 
SharedSQLContext {
     assert(e.message.contains("Boundary end is not a valid integer: 
2147483648"))
   }
 
-  test("range between should accept integer/long values as boundary") {
+  test("range between should accept int/long values as boundary") {
     val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"),
       (3L, "2"), (2L, "1"), (2147483650L, "2"))
       .toDF("key", "value")
@@ -191,6 +194,54 @@ class DataFrameWindowFunctionsSuite extends QueryTest with 
SharedSQLContext {
           
Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))),
       Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), 
Row(2147483650L, 4), Row(3, 1))
     )
+
+    def dt(date: String): Date = Date.valueOf(date)
+
+    val df2 = Seq((dt("2017-08-01"), "1"), (dt("2017-08-01"), "1"), 
(dt("2020-12-31"), "1"),
+      (dt("2017-08-03"), "2"), (dt("2017-08-02"), "1"), (dt("2020-12-31"), 
"2"))
+      .toDF("key", "value")
+    checkAnswer(
+      df2.select(
+        $"key",
+        count("key").over(
+          Window.partitionBy($"value").orderBy($"key").rangeBetween(lit(0), 
lit(2)))),
+      Seq(Row(dt("2017-08-01"), 3), Row(dt("2017-08-01"), 3), 
Row(dt("2020-12-31"), 1),
+        Row(dt("2017-08-03"), 1), Row(dt("2017-08-02"), 1), 
Row(dt("2020-12-31"), 1))
+    )
+  }
+
+  test("range between should accept double values as boundary") {
+    val df = Seq((1.0D, "1"), (1.0D, "1"), (100.001D, "1"),
+      (3.3D, "2"), (2.02D, "1"), (100.001D, "2"))
+      .toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        count("key").over(
+          Window.partitionBy($"value").orderBy($"key")
+            .rangeBetween(currentRow, lit(2.5D)))),
+      Seq(Row(1.0, 3), Row(1.0, 3), Row(100.001, 1), Row(3.3, 1), Row(2.02, 
1), Row(100.001, 1))
+    )
+  }
+
+  test("range between should accept interval values as boundary") {
+    def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
+
+    val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), 
(ts(1609372800), "1"),
+      (ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
+      .toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        count("key").over(
+          Window.partitionBy($"value").orderBy($"key")
+            .rangeBetween(currentRow,
+              lit(CalendarInterval.fromString("interval 23 days 4 hours"))))),
+      Seq(Row(ts(1501545600), 3), Row(ts(1501545600), 3), Row(ts(1609372800), 
1),
+        Row(ts(1503000000), 1), Row(ts(1502000000), 1), Row(ts(1609372800), 1))
+    )
   }
 
   test("aggregation and rows between with unbounded") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to