Repository: spark
Updated Branches:
  refs/heads/master f9a56a153 -> 6f20a92ca


[SPARK-17845] [SQL] More self-evident window function frame boundary API

## What changes were proposed in this pull request?
This patch improves the window function frame boundary API to make it more 
obvious to read and to use. The two high level changes are:

1. Create Window.currentRow, Window.unboundedPreceding, 
Window.unboundedFollowing to indicate the special values in frame boundaries. 
These methods map to the special integral values so we are not breaking 
backward compatibility here. This change makes the frame boundaries more 
self-evident (instead of Long.MinValue, it becomes Window.unboundedPreceding).

2. In Python, for any value less than or equal to JVM's Long.MinValue, treat it 
as Window.unboundedPreceding. For any value larger than or equal to JVM's 
Long.MaxValue, treat it as Window.unboundedFollowing. Before this change, if 
the user specifies any value that is less than Long.MinValue but not 
-sys.maxsize (e.g. -sys.maxsize + 1), the number we pass over to the JVM would 
overflow, resulting in a frame that does not make sense.

Code example required to specify a frame before this patch:
```
Window.rowsBetween(-Long.MinValue, 0)
```

While the above code should still work, the new way is more obvious to read:
```
Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
```

## How was this patch tested?
- Updated DataFrameWindowSuite (for Scala/Java)
- Updated test_window_functions_cumulative_sum (for Python)
- Renamed DataFrameWindowSuite DataFrameWindowFunctionsSuite to better reflect 
its purpose

Author: Reynold Xin <r...@databricks.com>

Closes #15438 from rxin/SPARK-17845.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f20a92c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f20a92c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f20a92c

Branch: refs/heads/master
Commit: 6f20a92ca30f9c367009c4556939ea4de4284cb9
Parents: f9a56a1
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Oct 12 16:45:10 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Wed Oct 12 16:45:10 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                     |  25 +-
 python/pyspark/sql/window.py                    |  89 ++--
 .../apache/spark/sql/expressions/Window.scala   |  62 ++-
 .../spark/sql/expressions/WindowSpec.scala      |  24 +-
 .../sql/DataFrameWindowFunctionsSuite.scala     | 426 +++++++++++++++++++
 .../apache/spark/sql/DataFrameWindowSuite.scala | 423 ------------------
 6 files changed, 579 insertions(+), 470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f20a92c/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 61674a8..51d5e7a 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1876,12 +1876,35 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
     def test_window_functions_cumulative_sum(self):
         df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", 
"value"])
         from pyspark.sql import functions as F
-        sel = df.select(df.key, 
F.sum(df.value).over(Window.rowsBetween(-sys.maxsize, 0)))
+
+        # Test cumulative sum
+        sel = df.select(
+            df.key,
+            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 
0)))
+        rs = sorted(sel.collect())
+        expected = [("one", 1), ("two", 3)]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
+
+        # Test boundary values less than JVM's Long.MinValue and make sure we 
don't overflow
+        sel = df.select(
+            df.key,
+            F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding 
- 1, 0)))
         rs = sorted(sel.collect())
         expected = [("one", 1), ("two", 3)]
         for r, ex in zip(rs, expected):
             self.assertEqual(tuple(r), ex[:len(r)])
 
+        # Test boundary values greater than JVM's Long.MaxValue and make sure 
we don't overflow
+        frame_end = Window.unboundedFollowing + 1
+        sel = df.select(
+            df.key,
+            F.sum(df.value).over(Window.rowsBetween(Window.currentRow, 
frame_end)))
+        rs = sorted(sel.collect())
+        expected = [("one", 3), ("two", 2)]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
+
     def test_collect_functions(self):
         df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, 
"2")], ["key", "value"])
         from pyspark.sql import functions

http://git-wip-us.apache.org/repos/asf/spark/blob/6f20a92c/python/pyspark/sql/window.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index 87e9a98..c345e62 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -36,8 +36,8 @@ class Window(object):
 
     For example:
 
-    >>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW
-    >>> window = 
Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0)
+    >>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+    >>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, 
Window.currentRow)
 
     >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 
FOLLOWING
     >>> window = 
Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
@@ -46,6 +46,16 @@ class Window(object):
 
     .. versionadded:: 1.4
     """
+
+    _JAVA_MIN_LONG = -(1 << 63)  # -9223372036854775808
+    _JAVA_MAX_LONG = (1 << 63) - 1  # 9223372036854775807
+
+    unboundedPreceding = _JAVA_MIN_LONG
+
+    unboundedFollowing = _JAVA_MAX_LONG
+
+    currentRow = 0
+
     @staticmethod
     @since(1.4)
     def partitionBy(*cols):
@@ -77,15 +87,21 @@ class Window(object):
         For example, "0" means "current row", while "-1" means the row before
         the current row, and "5" means the fifth row after the current row.
 
+        We recommend users use ``Window.unboundedPreceding``, 
``Window.unboundedFollowing``,
+        and ``Window.currentRow`` to specify special boundary values, rather 
than using integral
+        values directly.
+
         :param start: boundary start, inclusive.
-                      The frame is unbounded if this is ``-sys.maxsize`` (or 
lower).
+                      The frame is unbounded if this is 
``Window.unboundedPreceding``, or
+                      any value less than or equal to -9223372036854775808.
         :param end: boundary end, inclusive.
-                    The frame is unbounded if this is ``sys.maxsize`` (or 
higher).
+                    The frame is unbounded if this is 
``Window.unboundedFollowing``, or
+                    any value greater than or equal to 9223372036854775807.
         """
-        if start <= -sys.maxsize:
-            start = WindowSpec._JAVA_MIN_LONG
-        if end >= sys.maxsize:
-            end = WindowSpec._JAVA_MAX_LONG
+        if start <= Window._JAVA_MIN_LONG:
+            start = Window.unboundedPreceding
+        if end >= Window._JAVA_MAX_LONG:
+            end = Window.unboundedFollowing
         sc = SparkContext._active_spark_context
         jspec = 
sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
         return WindowSpec(jspec)
@@ -101,15 +117,21 @@ class Window(object):
         "0" means "current row", while "-1" means one off before the current 
row,
         and "5" means the five off after the current row.
 
+        We recommend users use ``Window.unboundedPreceding``, 
``Window.unboundedFollowing``,
+        and ``Window.currentRow`` to specify special boundary values, rather 
than using integral
+        values directly.
+
         :param start: boundary start, inclusive.
-                      The frame is unbounded if this is ``-sys.maxsize`` (or 
lower).
+                      The frame is unbounded if this is 
``Window.unboundedPreceding``, or
+                      any value less than or equal to -9223372036854775808.
         :param end: boundary end, inclusive.
-                    The frame is unbounded if this is ``sys.maxsize`` (or 
higher).
+                    The frame is unbounded if this is 
``Window.unboundedFollowing``, or
+                    any value greater than or equal to 9223372036854775807.
         """
-        if start <= -sys.maxsize:
-            start = WindowSpec._JAVA_MIN_LONG
-        if end >= sys.maxsize:
-            end = WindowSpec._JAVA_MAX_LONG
+        if start <= Window._JAVA_MIN_LONG:
+            start = Window.unboundedPreceding
+        if end >= Window._JAVA_MAX_LONG:
+            end = Window.unboundedFollowing
         sc = SparkContext._active_spark_context
         jspec = 
sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
         return WindowSpec(jspec)
@@ -127,9 +149,6 @@ class WindowSpec(object):
     .. versionadded:: 1.4
     """
 
-    _JAVA_MAX_LONG = (1 << 63) - 1
-    _JAVA_MIN_LONG = - (1 << 63)
-
     def __init__(self, jspec):
         self._jspec = jspec
 
@@ -160,15 +179,21 @@ class WindowSpec(object):
         For example, "0" means "current row", while "-1" means the row before
         the current row, and "5" means the fifth row after the current row.
 
+        We recommend users use ``Window.unboundedPreceding``, 
``Window.unboundedFollowing``,
+        and ``Window.currentRow`` to specify special boundary values, rather 
than using integral
+        values directly.
+
         :param start: boundary start, inclusive.
-                      The frame is unbounded if this is ``-sys.maxsize`` (or 
lower).
+                      The frame is unbounded if this is 
``Window.unboundedPreceding``, or
+                      any value less than or equal to -9223372036854775808.
         :param end: boundary end, inclusive.
-                    The frame is unbounded if this is ``sys.maxsize`` (or 
higher).
+                    The frame is unbounded if this is 
``Window.unboundedFollowing``, or
+                    any value greater than or equal to 9223372036854775807.
         """
-        if start <= -sys.maxsize:
-            start = self._JAVA_MIN_LONG
-        if end >= sys.maxsize:
-            end = self._JAVA_MAX_LONG
+        if start <= Window._JAVA_MIN_LONG:
+            start = Window.unboundedPreceding
+        if end >= Window._JAVA_MAX_LONG:
+            end = Window.unboundedFollowing
         return WindowSpec(self._jspec.rowsBetween(start, end))
 
     @since(1.4)
@@ -180,15 +205,21 @@ class WindowSpec(object):
         "0" means "current row", while "-1" means one off before the current 
row,
         and "5" means the five off after the current row.
 
+        We recommend users use ``Window.unboundedPreceding``, 
``Window.unboundedFollowing``,
+        and ``Window.currentRow`` to specify special boundary values, rather 
than using integral
+        values directly.
+
         :param start: boundary start, inclusive.
-                      The frame is unbounded if this is ``-sys.maxsize`` (or 
lower).
+                      The frame is unbounded if this is 
``Window.unboundedPreceding``, or
+                      any value less than or equal to -9223372036854775808.
         :param end: boundary end, inclusive.
-                    The frame is unbounded if this is ``sys.maxsize`` (or 
higher).
+                    The frame is unbounded if this is 
``Window.unboundedFollowing``, or
+                    any value greater than or equal to 9223372036854775807.
         """
-        if start <= -sys.maxsize:
-            start = self._JAVA_MIN_LONG
-        if end >= sys.maxsize:
-            end = self._JAVA_MAX_LONG
+        if start <= Window._JAVA_MIN_LONG:
+            start = Window.unboundedPreceding
+        if end >= Window._JAVA_MAX_LONG:
+            end = Window.unboundedFollowing
         return WindowSpec(self._jspec.rangeBetween(start, end))
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6f20a92c/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
index e8a0c5f..3c1f6e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.expressions._
  *
  * {{{
  *   // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW
- *   Window.partitionBy("country").orderBy("date").rowsBetween(Long.MinValue, 
0)
+ *   Window.partitionBy("country").orderBy("date")
+ *     .rowsBetween(Window.unboundedPreceding, Window.currentRow)
  *
  *   // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 
FOLLOWING
  *   Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
@@ -75,6 +76,41 @@ object Window {
   }
 
   /**
+   * Value representing the last row in the partition, equivalent to 
"UNBOUNDED PRECEDING" in SQL.
+   * This can be used to specify the frame boundaries:
+   *
+   * {{{
+   *   Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
+   * }}}
+   *
+   * @since 2.1.0
+   */
+  def unboundedPreceding: Long = Long.MinValue
+
+  /**
+   * Value representing the last row in the partition, equivalent to 
"UNBOUNDED FOLLOWING" in SQL.
+   * This can be used to specify the frame boundaries:
+   *
+   * {{{
+   *   Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+   * }}}
+   *
+   * @since 2.1.0
+   */
+  def unboundedFollowing: Long = Long.MaxValue
+
+  /**
+   * Value representing the current row. This can be used to specify the frame 
boundaries:
+   *
+   * {{{
+   *   Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
+   * }}}
+   *
+   * @since 2.1.0
+   */
+  def currentRow: Long = 0
+
+  /**
    * Creates a [[WindowSpec]] with the frame boundaries defined,
    * from `start` (inclusive) to `end` (inclusive).
    *
@@ -82,10 +118,14 @@ object Window {
    * "current row", while "-1" means the row before the current row, and "5" 
means the fifth row
    * after the current row.
    *
-   * @param start boundary start, inclusive.
-   *              The frame is unbounded if this is the minimum long value.
-   * @param end boundary end, inclusive.
-   *            The frame is unbounded if this is the maximum long value.
+   * We recommend users use [[Window.unboundedPreceding]], 
[[Window.unboundedFollowing]],
+   * and [[Window.currentRow]] to specify special boundary values, rather than 
using integral
+   * values directly.
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if this is
+   *              the minimum long value ([[Window.unboundedPreceding]]).
+   * @param end boundary end, inclusive. The frame is unbounded if this is the
+   *            maximum long value  ([[Window.unboundedFollowing]]).
    * @since 2.1.0
    */
   // Note: when updating the doc for this method, also update 
WindowSpec.rowsBetween.
@@ -101,10 +141,14 @@ object Window {
    * while "-1" means one off before the current row, and "5" means the five 
off after the
    * current row.
    *
-   * @param start boundary start, inclusive.
-   *              The frame is unbounded if this is the minimum long value.
-   * @param end boundary end, inclusive.
-   *            The frame is unbounded if this is the maximum long value.
+   * We recommend users use [[Window.unboundedPreceding]], 
[[Window.unboundedFollowing]],
+   * and [[Window.currentRow]] to specify special boundary values, rather than 
using integral
+   * values directly.
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if this is
+   *              the minimum long value ([[Window.unboundedPreceding]]).
+   * @param end boundary end, inclusive. The frame is unbounded if this is the
+   *            maximum long value  ([[Window.unboundedFollowing]]).
    * @since 2.1.0
    */
   // Note: when updating the doc for this method, also update 
WindowSpec.rangeBetween.

http://git-wip-us.apache.org/repos/asf/spark/blob/6f20a92c/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
index 82bc8f1..8ebed39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -86,10 +86,14 @@ class WindowSpec private[sql](
    * "current row", while "-1" means the row before the current row, and "5" 
means the fifth row
    * after the current row.
    *
-   * @param start boundary start, inclusive.
-   *              The frame is unbounded if this is the minimum long value.
-   * @param end boundary end, inclusive.
-   *            The frame is unbounded if this is the maximum long value.
+   * We recommend users use [[Window.unboundedPreceding]], 
[[Window.unboundedFollowing]],
+   * and [[Window.currentRow]] to specify special boundary values, rather than 
using integral
+   * values directly.
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if this is
+   *              the minimum long value ([[Window.unboundedPreceding]]).
+   * @param end boundary end, inclusive. The frame is unbounded if this is the
+   *            maximum long value  ([[Window.unboundedFollowing]]).
    * @since 1.4.0
    */
   // Note: when updating the doc for this method, also update 
Window.rowsBetween.
@@ -104,10 +108,14 @@ class WindowSpec private[sql](
    * while "-1" means one off before the current row, and "5" means the five 
off after the
    * current row.
    *
-   * @param start boundary start, inclusive.
-   *              The frame is unbounded if this is the minimum long value.
-   * @param end boundary end, inclusive.
-   *            The frame is unbounded if this is the maximum long value.
+   * We recommend users use [[Window.unboundedPreceding]], 
[[Window.unboundedFollowing]],
+   * and [[Window.currentRow]] to specify special boundary values, rather than 
using integral
+   * values directly.
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if this is
+   *              the minimum long value ([[Window.unboundedPreceding]]).
+   * @param end boundary end, inclusive. The frame is unbounded if this is the
+   *            maximum long value  ([[Window.unboundedFollowing]]).
    * @since 1.4.0
    */
   // Note: when updating the doc for this method, also update 
Window.rangeBetween.

http://git-wip-us.apache.org/repos/asf/spark/blob/6f20a92c/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
new file mode 100644
index 0000000..1255c49
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction, Window}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{DataType, LongType, StructType}
+
+/**
+ * Window function testing for DataFrame API.
+ */
+class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  test("reuse window partitionBy") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    val w = Window.partitionBy("key").orderBy("value")
+
+    checkAnswer(
+      df.select(
+        lead("key", 1).over(w),
+        lead("value", 1).over(w)),
+      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
+  }
+
+  test("reuse window orderBy") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    val w = Window.orderBy("value").partitionBy("key")
+
+    checkAnswer(
+      df.select(
+        lead("key", 1).over(w),
+        lead("value", 1).over(w)),
+      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
+  }
+
+  test("Window.rowsBetween") {
+    val df = Seq(("one", 1), ("two", 2)).toDF("key", "value")
+    // Running (cumulative) sum
+    checkAnswer(
+      df.select('key, sum("value").over(
+        Window.rowsBetween(Window.unboundedPreceding, Window.currentRow))),
+      Row("one", 1) :: Row("two", 3) :: Nil
+    )
+  }
+
+  test("lead") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+
+    checkAnswer(
+      df.select(
+        lead("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
+      Row("1") :: Row(null) :: Row("2") :: Row(null) :: Nil)
+  }
+
+  test("lag") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+
+    checkAnswer(
+      df.select(
+        lag("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
+      Row(null) :: Row("1") :: Row(null) :: Row("2") :: Nil)
+  }
+
+  test("lead with default value") {
+    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
+                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        lead("value", 2, 
"n/a").over(Window.partitionBy("key").orderBy("value"))),
+      Seq(Row("1"), Row("1"), Row("n/a"), Row("n/a"), Row("2"), Row("n/a"), 
Row("n/a")))
+  }
+
+  test("lag with default value") {
+    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
+                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        lag("value", 2, 
"n/a").over(Window.partitionBy($"key").orderBy($"value"))),
+      Seq(Row("n/a"), Row("n/a"), Row("1"), Row("1"), Row("n/a"), Row("n/a"), 
Row("2")))
+  }
+
+  test("rank functions in unspecific window") {
+    val df = Seq((1, "1"), (2, "2"), (1, "2"), (2, "2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        max("key").over(Window.partitionBy("value").orderBy("key")),
+        min("key").over(Window.partitionBy("value").orderBy("key")),
+        mean("key").over(Window.partitionBy("value").orderBy("key")),
+        count("key").over(Window.partitionBy("value").orderBy("key")),
+        sum("key").over(Window.partitionBy("value").orderBy("key")),
+        ntile(2).over(Window.partitionBy("value").orderBy("key")),
+        row_number().over(Window.partitionBy("value").orderBy("key")),
+        dense_rank().over(Window.partitionBy("value").orderBy("key")),
+        rank().over(Window.partitionBy("value").orderBy("key")),
+        cume_dist().over(Window.partitionBy("value").orderBy("key")),
+        percent_rank().over(Window.partitionBy("value").orderBy("key"))),
+      Row(1, 1, 1, 1.0d, 1, 1, 1, 1, 1, 1, 1.0d, 0.0d) ::
+      Row(1, 1, 1, 1.0d, 1, 1, 1, 1, 1, 1, 1.0d / 3.0d, 0.0d) ::
+      Row(2, 2, 1, 5.0d / 3.0d, 3, 5, 1, 2, 2, 2, 1.0d, 0.5d) ::
+      Row(2, 2, 1, 5.0d / 3.0d, 3, 5, 2, 3, 2, 2, 1.0d, 0.5d) :: Nil)
+  }
+
+  test("window function should fail if order by clause is not specified") {
+    val df = Seq((1, "1"), (2, "2"), (1, "2"), (2, "2")).toDF("key", "value")
+    val e = intercept[AnalysisException](
+      // Here we missed .orderBy("key")!
+      df.select(row_number().over(Window.partitionBy("value"))).collect())
+    assert(e.message.contains("requires window to be ordered"))
+  }
+
+  test("aggregation and rows between") {
+    val df = Seq((1, "1"), (2, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", 
"value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        
avg("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 
2))),
+      Seq(Row(4.0d / 3.0d), Row(4.0d / 3.0d), Row(3.0d / 2.0d), Row(2.0d), 
Row(2.0d)))
+  }
+
+  test("aggregation and range between") {
+    val df = Seq((1, "1"), (1, "1"), (3, "1"), (2, "2"), (2, "1"), (2, 
"2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        
avg("key").over(Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 
1))),
+      Seq(Row(4.0d / 3.0d), Row(4.0d / 3.0d), Row(7.0d / 4.0d), Row(5.0d / 
2.0d),
+        Row(2.0d), Row(2.0d)))
+  }
+
+  test("aggregation and rows between with unbounded") {
+    val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, 
"3")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        last("key").over(
+          Window.partitionBy($"value").orderBy($"key")
+            .rowsBetween(Window.currentRow, Window.unboundedFollowing)),
+        last("key").over(
+          Window.partitionBy($"value").orderBy($"key")
+            .rowsBetween(Window.unboundedPreceding, Window.currentRow)),
+        
last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 
1))),
+      Seq(Row(1, 1, 1, 1), Row(2, 3, 2, 3), Row(3, 3, 3, 3), Row(1, 4, 1, 2), 
Row(2, 4, 2, 4),
+        Row(4, 4, 4, 4)))
+  }
+
+  test("aggregation and range between with unbounded") {
+    val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, 
"2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        last("value").over(
+          Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1))
+          .equalTo("2")
+          .as("last_v"),
+        
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue,
 1))
+          .as("avg_key1"),
+        
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, 
Long.MaxValue))
+          .as("avg_key2"),
+        
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 0))
+          .as("avg_key3")
+      ),
+      Seq(Row(3, null, 3.0d, 4.0d, 3.0d),
+        Row(5, false, 4.0d, 5.0d, 5.0d),
+        Row(2, null, 2.0d, 17.0d / 4.0d, 2.0d),
+        Row(4, true, 11.0d / 3.0d, 5.0d, 4.0d),
+        Row(5, true, 17.0d / 4.0d, 11.0d / 2.0d, 4.5d),
+        Row(6, true, 17.0d / 4.0d, 6.0d, 11.0d / 2.0d)))
+  }
+
+  test("reverse sliding range frame") {
+    val df = Seq(
+      (1, "Thin", "Cell Phone", 6000),
+      (2, "Normal", "Tablet", 1500),
+      (3, "Mini", "Tablet", 5500),
+      (4, "Ultra thin", "Cell Phone", 5500),
+      (5, "Very thin", "Cell Phone", 6000),
+      (6, "Big", "Tablet", 2500),
+      (7, "Bendable", "Cell Phone", 3000),
+      (8, "Foldable", "Cell Phone", 3000),
+      (9, "Pro", "Tablet", 4500),
+      (10, "Pro2", "Tablet", 6500)).
+      toDF("id", "product", "category", "revenue")
+    val window = Window.
+      partitionBy($"category").
+      orderBy($"revenue".desc).
+      rangeBetween(-2000L, 1000L)
+    checkAnswer(
+      df.select(
+        $"id",
+        avg($"revenue").over(window).cast("int")),
+      Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+        Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
+        Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
+        Row(10, 6000) :: Nil)
+  }
+
+  // This is here to illustrate the fact that reverse order also reverses 
offsets.
+  test("reverse unbounded range frame") {
+    val df = Seq(1, 2, 4, 3, 2, 1).
+      map(Tuple1.apply).
+      toDF("value")
+    val window = Window.orderBy($"value".desc)
+    checkAnswer(
+      df.select(
+        $"value",
+        sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
+        sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
+      Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
+        Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
+  }
+
+  test("statistical functions") {
+    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), 
("b", 2)).
+      toDF("key", "value")
+    val window = Window.partitionBy($"key")
+    checkAnswer(
+      df.select(
+        $"key",
+        var_pop($"value").over(window),
+        var_samp($"value").over(window),
+        approx_count_distinct($"value").over(window)),
+      Seq.fill(4)(Row("a", 1.0d / 4.0d, 1.0d / 3.0d, 2))
+      ++ Seq.fill(3)(Row("b", 2.0d / 3.0d, 1.0d, 3)))
+  }
+
+  test("window function with aggregates") {
+    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), 
("b", 2)).
+      toDF("key", "value")
+    val window = Window.orderBy()
+    checkAnswer(
+      df.groupBy($"key")
+        .agg(
+          sum($"value"),
+          sum(sum($"value")).over(window) - sum($"value")),
+      Seq(Row("a", 6, 9), Row("b", 9, 6)))
+  }
+
+  test("SPARK-16195 empty over spec") {
+    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("b", 2)).
+      toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select($"key", $"value", sum($"value").over(), avg($"value").over()),
+      Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), 
Row("b", 2, 6, 1.5)))
+    checkAnswer(
+      sql("select key, value, sum(value) over(), avg(value) over() from 
window_table"),
+      Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), 
Row("b", 2, 6, 1.5)))
+  }
+
+  test("window function with udaf") {
+    val udaf = new UserDefinedAggregateFunction {
+      def inputSchema: StructType = new StructType()
+        .add("a", LongType)
+        .add("b", LongType)
+
+      def bufferSchema: StructType = new StructType()
+        .add("product", LongType)
+
+      def dataType: DataType = LongType
+
+      def deterministic: Boolean = true
+
+      def initialize(buffer: MutableAggregationBuffer): Unit = {
+        buffer(0) = 0L
+      }
+
+      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+        if (!(input.isNullAt(0) || input.isNullAt(1))) {
+          buffer(0) = buffer.getLong(0) + input.getLong(0) * input.getLong(1)
+        }
+      }
+
+      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
+      }
+
+      def evaluate(buffer: Row): Any =
+        buffer.getLong(0)
+    }
+    val df = Seq(
+      ("a", 1, 1),
+      ("a", 1, 5),
+      ("a", 2, 10),
+      ("a", 2, -1),
+      ("b", 4, 7),
+      ("b", 3, 8),
+      ("b", 2, 4))
+      .toDF("key", "a", "b")
+    val window = 
Window.partitionBy($"key").orderBy($"a").rangeBetween(Long.MinValue, 0L)
+    checkAnswer(
+      df.select(
+        $"key",
+        $"a",
+        $"b",
+        udaf($"a", $"b").over(window)),
+      Seq(
+        Row("a", 1, 1, 6),
+        Row("a", 1, 5, 6),
+        Row("a", 2, 10, 24),
+        Row("a", 2, -1, 24),
+        Row("b", 4, 7, 60),
+        Row("b", 3, 8, 32),
+        Row("b", 2, 4, 8)))
+  }
+
+  test("null inputs") {
+    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), 
("b", 2))
+      .toDF("key", "value")
+    val window = Window.orderBy()
+    checkAnswer(
+      df.select(
+        $"key",
+        $"value",
+        avg(lit(null)).over(window),
+        sum(lit(null)).over(window)),
+      Seq(
+        Row("a", 1, null, null),
+        Row("a", 1, null, null),
+        Row("a", 2, null, null),
+        Row("a", 2, null, null),
+        Row("b", 4, null, null),
+        Row("b", 3, null, null),
+        Row("b", 2, null, null)))
+  }
+
+  test("last/first with ignoreNulls") {
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, nullStr),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, nullStr),
+      ("b", 1, nullStr),
+      ("b", 2, nullStr)).
+      toDF("key", "order", "value")
+    val window = Window.partitionBy($"key").orderBy($"order")
+    checkAnswer(
+      df.select(
+        $"key",
+        $"order",
+        first($"value").over(window),
+        first($"value", ignoreNulls = false).over(window),
+        first($"value", ignoreNulls = true).over(window),
+        last($"value").over(window),
+        last($"value", ignoreNulls = false).over(window),
+        last($"value", ignoreNulls = true).over(window)),
+      Seq(
+        Row("a", 0, null, null, null, null, null, null),
+        Row("a", 1, null, null, "x", "x", "x", "x"),
+        Row("a", 2, null, null, "x", "y", "y", "y"),
+        Row("a", 3, null, null, "x", "z", "z", "z"),
+        Row("a", 4, null, null, "x", null, null, "z"),
+        Row("b", 1, null, null, null, null, null, null),
+        Row("b", 2, null, null, null, null, null, null)))
+  }
+
+  test("SPARK-12989 ExtractWindowExpressions treats alias as regular 
attribute") {
+    val src = Seq((0, 3, 5)).toDF("a", "b", "c")
+      .withColumn("Data", struct("a", "b"))
+      .drop("a")
+      .drop("b")
+    val winSpec = Window.partitionBy("Data.a", "Data.b").orderBy($"c".desc)
+    val df = src.select($"*", max("c").over(winSpec) as "max")
+    checkAnswer(df, Row(5, Row(0, 3), 5))
+  }
+
+  test("aggregation and rows between with unbounded + predicate pushdown") {
+    val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, 
"3")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    val selectList = Seq($"key", $"value",
+      last("key").over(
+        Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 
Long.MaxValue)),
+      last("key").over(
+        
Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)),
+      
last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 
1)))
+
+    checkAnswer(
+      df.select(selectList: _*).where($"value" < "3"),
+      Seq(Row(1, "1", 1, 1, 1), Row(2, "2", 3, 2, 3), Row(3, "2", 3, 3, 3)))
+  }
+
+  test("aggregation and range between with unbounded + predicate pushdown") {
+    val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, 
"2")).toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    val selectList = Seq($"key", $"value",
+      last("value").over(
+        Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, 
-1)).equalTo("2")
+        .as("last_v"),
+      
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue,
 1))
+        .as("avg_key1"),
+      
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, 
Long.MaxValue))
+        .as("avg_key2"),
+      
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 1))
+        .as("avg_key3"))
+
+    checkAnswer(
+      df.select(selectList: _*).where($"value" < 2),
+      Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6f20a92c/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
deleted file mode 100644
index 5bc386f..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
+++ /dev/null
@@ -1,423 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction, Window}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{DataType, LongType, StructType}
-
-/**
- * Window function testing for DataFrame API.
- */
-class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
-  import testImplicits._
-
-  test("reuse window partitionBy") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    val w = Window.partitionBy("key").orderBy("value")
-
-    checkAnswer(
-      df.select(
-        lead("key", 1).over(w),
-        lead("value", 1).over(w)),
-      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
-  }
-
-  test("reuse window orderBy") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    val w = Window.orderBy("value").partitionBy("key")
-
-    checkAnswer(
-      df.select(
-        lead("key", 1).over(w),
-        lead("value", 1).over(w)),
-      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
-  }
-
-  test("Window.rowsBetween") {
-    val df = Seq(("one", 1), ("two", 2)).toDF("key", "value")
-    // Running (cumulative) sum
-    checkAnswer(
-      df.select('key, sum("value").over(Window.rowsBetween(Long.MinValue, 0))),
-      Row("one", 1) :: Row("two", 3) :: Nil
-    )
-  }
-
-  test("lead") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-
-    checkAnswer(
-      df.select(
-        lead("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
-      Row("1") :: Row(null) :: Row("2") :: Row(null) :: Nil)
-  }
-
-  test("lag") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-
-    checkAnswer(
-      df.select(
-        lag("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
-      Row(null) :: Row("1") :: Row(null) :: Row("2") :: Nil)
-  }
-
-  test("lead with default value") {
-    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
-                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        lead("value", 2, 
"n/a").over(Window.partitionBy("key").orderBy("value"))),
-      Seq(Row("1"), Row("1"), Row("n/a"), Row("n/a"), Row("2"), Row("n/a"), 
Row("n/a")))
-  }
-
-  test("lag with default value") {
-    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
-                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        lag("value", 2, 
"n/a").over(Window.partitionBy($"key").orderBy($"value"))),
-      Seq(Row("n/a"), Row("n/a"), Row("1"), Row("1"), Row("n/a"), Row("n/a"), 
Row("2")))
-  }
-
-  test("rank functions in unspecific window") {
-    val df = Seq((1, "1"), (2, "2"), (1, "2"), (2, "2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        max("key").over(Window.partitionBy("value").orderBy("key")),
-        min("key").over(Window.partitionBy("value").orderBy("key")),
-        mean("key").over(Window.partitionBy("value").orderBy("key")),
-        count("key").over(Window.partitionBy("value").orderBy("key")),
-        sum("key").over(Window.partitionBy("value").orderBy("key")),
-        ntile(2).over(Window.partitionBy("value").orderBy("key")),
-        row_number().over(Window.partitionBy("value").orderBy("key")),
-        dense_rank().over(Window.partitionBy("value").orderBy("key")),
-        rank().over(Window.partitionBy("value").orderBy("key")),
-        cume_dist().over(Window.partitionBy("value").orderBy("key")),
-        percent_rank().over(Window.partitionBy("value").orderBy("key"))),
-      Row(1, 1, 1, 1.0d, 1, 1, 1, 1, 1, 1, 1.0d, 0.0d) ::
-      Row(1, 1, 1, 1.0d, 1, 1, 1, 1, 1, 1, 1.0d / 3.0d, 0.0d) ::
-      Row(2, 2, 1, 5.0d / 3.0d, 3, 5, 1, 2, 2, 2, 1.0d, 0.5d) ::
-      Row(2, 2, 1, 5.0d / 3.0d, 3, 5, 2, 3, 2, 2, 1.0d, 0.5d) :: Nil)
-  }
-
-  test("window function should fail if order by clause is not specified") {
-    val df = Seq((1, "1"), (2, "2"), (1, "2"), (2, "2")).toDF("key", "value")
-    val e = intercept[AnalysisException](
-      // Here we missed .orderBy("key")!
-      df.select(row_number().over(Window.partitionBy("value"))).collect())
-    assert(e.message.contains("requires window to be ordered"))
-  }
-
-  test("aggregation and rows between") {
-    val df = Seq((1, "1"), (2, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", 
"value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        
avg("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 
2))),
-      Seq(Row(4.0d / 3.0d), Row(4.0d / 3.0d), Row(3.0d / 2.0d), Row(2.0d), 
Row(2.0d)))
-  }
-
-  test("aggregation and range between") {
-    val df = Seq((1, "1"), (1, "1"), (3, "1"), (2, "2"), (2, "1"), (2, 
"2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        
avg("key").over(Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 
1))),
-      Seq(Row(4.0d / 3.0d), Row(4.0d / 3.0d), Row(7.0d / 4.0d), Row(5.0d / 
2.0d),
-        Row(2.0d), Row(2.0d)))
-  }
-
-  test("aggregation and rows between with unbounded") {
-    val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, 
"3")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        last("key").over(
-          Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 
Long.MaxValue)),
-        last("key").over(
-          
Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)),
-        
last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 
1))),
-      Seq(Row(1, 1, 1, 1), Row(2, 3, 2, 3), Row(3, 3, 3, 3), Row(1, 4, 1, 2), 
Row(2, 4, 2, 4),
-        Row(4, 4, 4, 4)))
-  }
-
-  test("aggregation and range between with unbounded") {
-    val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, 
"2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        last("value").over(
-          Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1))
-          .equalTo("2")
-          .as("last_v"),
-        
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue,
 1))
-          .as("avg_key1"),
-        
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, 
Long.MaxValue))
-          .as("avg_key2"),
-        
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 0))
-          .as("avg_key3")
-      ),
-      Seq(Row(3, null, 3.0d, 4.0d, 3.0d),
-        Row(5, false, 4.0d, 5.0d, 5.0d),
-        Row(2, null, 2.0d, 17.0d / 4.0d, 2.0d),
-        Row(4, true, 11.0d / 3.0d, 5.0d, 4.0d),
-        Row(5, true, 17.0d / 4.0d, 11.0d / 2.0d, 4.5d),
-        Row(6, true, 17.0d / 4.0d, 6.0d, 11.0d / 2.0d)))
-  }
-
-  test("reverse sliding range frame") {
-    val df = Seq(
-      (1, "Thin", "Cell Phone", 6000),
-      (2, "Normal", "Tablet", 1500),
-      (3, "Mini", "Tablet", 5500),
-      (4, "Ultra thin", "Cell Phone", 5500),
-      (5, "Very thin", "Cell Phone", 6000),
-      (6, "Big", "Tablet", 2500),
-      (7, "Bendable", "Cell Phone", 3000),
-      (8, "Foldable", "Cell Phone", 3000),
-      (9, "Pro", "Tablet", 4500),
-      (10, "Pro2", "Tablet", 6500)).
-      toDF("id", "product", "category", "revenue")
-    val window = Window.
-      partitionBy($"category").
-      orderBy($"revenue".desc).
-      rangeBetween(-2000L, 1000L)
-    checkAnswer(
-      df.select(
-        $"id",
-        avg($"revenue").over(window).cast("int")),
-      Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
-        Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
-        Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
-        Row(10, 6000) :: Nil)
-  }
-
-  // This is here to illustrate the fact that reverse order also reverses 
offsets.
-  test("reverse unbounded range frame") {
-    val df = Seq(1, 2, 4, 3, 2, 1).
-      map(Tuple1.apply).
-      toDF("value")
-    val window = Window.orderBy($"value".desc)
-    checkAnswer(
-      df.select(
-        $"value",
-        sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
-        sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
-      Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
-        Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
-  }
-
-  test("statistical functions") {
-    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), 
("b", 2)).
-      toDF("key", "value")
-    val window = Window.partitionBy($"key")
-    checkAnswer(
-      df.select(
-        $"key",
-        var_pop($"value").over(window),
-        var_samp($"value").over(window),
-        approx_count_distinct($"value").over(window)),
-      Seq.fill(4)(Row("a", 1.0d / 4.0d, 1.0d / 3.0d, 2))
-      ++ Seq.fill(3)(Row("b", 2.0d / 3.0d, 1.0d, 3)))
-  }
-
-  test("window function with aggregates") {
-    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), 
("b", 2)).
-      toDF("key", "value")
-    val window = Window.orderBy()
-    checkAnswer(
-      df.groupBy($"key")
-        .agg(
-          sum($"value"),
-          sum(sum($"value")).over(window) - sum($"value")),
-      Seq(Row("a", 6, 9), Row("b", 9, 6)))
-  }
-
-  test("SPARK-16195 empty over spec") {
-    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("b", 2)).
-      toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select($"key", $"value", sum($"value").over(), avg($"value").over()),
-      Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), 
Row("b", 2, 6, 1.5)))
-    checkAnswer(
-      sql("select key, value, sum(value) over(), avg(value) over() from 
window_table"),
-      Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), 
Row("b", 2, 6, 1.5)))
-  }
-
-  test("window function with udaf") {
-    val udaf = new UserDefinedAggregateFunction {
-      def inputSchema: StructType = new StructType()
-        .add("a", LongType)
-        .add("b", LongType)
-
-      def bufferSchema: StructType = new StructType()
-        .add("product", LongType)
-
-      def dataType: DataType = LongType
-
-      def deterministic: Boolean = true
-
-      def initialize(buffer: MutableAggregationBuffer): Unit = {
-        buffer(0) = 0L
-      }
-
-      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
-        if (!(input.isNullAt(0) || input.isNullAt(1))) {
-          buffer(0) = buffer.getLong(0) + input.getLong(0) * input.getLong(1)
-        }
-      }
-
-      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
-      }
-
-      def evaluate(buffer: Row): Any =
-        buffer.getLong(0)
-    }
-    val df = Seq(
-      ("a", 1, 1),
-      ("a", 1, 5),
-      ("a", 2, 10),
-      ("a", 2, -1),
-      ("b", 4, 7),
-      ("b", 3, 8),
-      ("b", 2, 4))
-      .toDF("key", "a", "b")
-    val window = 
Window.partitionBy($"key").orderBy($"a").rangeBetween(Long.MinValue, 0L)
-    checkAnswer(
-      df.select(
-        $"key",
-        $"a",
-        $"b",
-        udaf($"a", $"b").over(window)),
-      Seq(
-        Row("a", 1, 1, 6),
-        Row("a", 1, 5, 6),
-        Row("a", 2, 10, 24),
-        Row("a", 2, -1, 24),
-        Row("b", 4, 7, 60),
-        Row("b", 3, 8, 32),
-        Row("b", 2, 4, 8)))
-  }
-
-  test("null inputs") {
-    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), 
("b", 2))
-      .toDF("key", "value")
-    val window = Window.orderBy()
-    checkAnswer(
-      df.select(
-        $"key",
-        $"value",
-        avg(lit(null)).over(window),
-        sum(lit(null)).over(window)),
-      Seq(
-        Row("a", 1, null, null),
-        Row("a", 1, null, null),
-        Row("a", 2, null, null),
-        Row("a", 2, null, null),
-        Row("b", 4, null, null),
-        Row("b", 3, null, null),
-        Row("b", 2, null, null)))
-  }
-
-  test("last/first with ignoreNulls") {
-    val nullStr: String = null
-    val df = Seq(
-      ("a", 0, nullStr),
-      ("a", 1, "x"),
-      ("a", 2, "y"),
-      ("a", 3, "z"),
-      ("a", 4, nullStr),
-      ("b", 1, nullStr),
-      ("b", 2, nullStr)).
-      toDF("key", "order", "value")
-    val window = Window.partitionBy($"key").orderBy($"order")
-    checkAnswer(
-      df.select(
-        $"key",
-        $"order",
-        first($"value").over(window),
-        first($"value", ignoreNulls = false).over(window),
-        first($"value", ignoreNulls = true).over(window),
-        last($"value").over(window),
-        last($"value", ignoreNulls = false).over(window),
-        last($"value", ignoreNulls = true).over(window)),
-      Seq(
-        Row("a", 0, null, null, null, null, null, null),
-        Row("a", 1, null, null, "x", "x", "x", "x"),
-        Row("a", 2, null, null, "x", "y", "y", "y"),
-        Row("a", 3, null, null, "x", "z", "z", "z"),
-        Row("a", 4, null, null, "x", null, null, "z"),
-        Row("b", 1, null, null, null, null, null, null),
-        Row("b", 2, null, null, null, null, null, null)))
-  }
-
-  test("SPARK-12989 ExtractWindowExpressions treats alias as regular 
attribute") {
-    val src = Seq((0, 3, 5)).toDF("a", "b", "c")
-      .withColumn("Data", struct("a", "b"))
-      .drop("a")
-      .drop("b")
-    val winSpec = Window.partitionBy("Data.a", "Data.b").orderBy($"c".desc)
-    val df = src.select($"*", max("c").over(winSpec) as "max")
-    checkAnswer(df, Row(5, Row(0, 3), 5))
-  }
-
-  test("aggregation and rows between with unbounded + predicate pushdown") {
-    val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, 
"3")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    val selectList = Seq($"key", $"value",
-      last("key").over(
-        Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 
Long.MaxValue)),
-      last("key").over(
-        
Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)),
-      
last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 
1)))
-
-    checkAnswer(
-      df.select(selectList: _*).where($"value" < "3"),
-      Seq(Row(1, "1", 1, 1, 1), Row(2, "2", 3, 2, 3), Row(3, "2", 3, 3, 3)))
-  }
-
-  test("aggregation and range between with unbounded + predicate pushdown") {
-    val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, 
"2")).toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    val selectList = Seq($"key", $"value",
-      last("value").over(
-        Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, 
-1)).equalTo("2")
-        .as("last_v"),
-      
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue,
 1))
-        .as("avg_key1"),
-      
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, 
Long.MaxValue))
-        .as("avg_key2"),
-      
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 1))
-        .as("avg_key3"))
-
-    checkAnswer(
-      df.select(selectList: _*).where($"value" < 2),
-      Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0)))
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to