Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20019#discussion_r158200860
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.unsafe.types.CalendarInterval
+
+/**
+ * Window frame testing for DataFrame API.
+ */
+class DataFrameWindowFramesSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
+ test("lead/lag with empty data frame") {
+ val df = Seq.empty[(Int, String)].toDF("key", "value")
+ val window = Window.partitionBy($"key").orderBy($"value")
+
+ checkAnswer(
+ df.select(
+ lead("value", 1).over(window),
+ lag("value", 1).over(window)),
+ Nil)
+ }
+
+ test("lead/lag with positive offset") {
+ val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key",
"value")
+ val window = Window.partitionBy($"key").orderBy($"value")
+
+ checkAnswer(
+ df.select(
+ $"key",
+ lead("value", 1).over(window),
+ lag("value", 1).over(window)),
+ Row(1, "3", null) :: Row(1, null, "1") :: Row(2, "4", null) ::
Row(2, null, "2") :: Nil)
+ }
+
+ test("reverse lead/lag with positive offset") {
+ val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key",
"value")
+ val window = Window.partitionBy($"key").orderBy($"value".desc)
+
+ checkAnswer(
+ df.select(
+ $"key",
+ lead("value", 1).over(window),
+ lag("value", 1).over(window)),
+ Row(1, "1", null) :: Row(1, null, "3") :: Row(2, "2", null) ::
Row(2, null, "4") :: Nil)
+ }
+
+ test("lead/lag with negative offset") {
+ val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key",
"value")
+ val window = Window.partitionBy($"key").orderBy($"value")
+
+ checkAnswer(
+ df.select(
+ $"key",
+ lead("value", -1).over(window),
+ lag("value", -1).over(window)),
+ Row(1, null, "3") :: Row(1, "1", null) :: Row(2, null, "4") ::
Row(2, "2", null) :: Nil)
+ }
+
+ test("reverse lead/lag with negative offset") {
+ val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key",
"value")
+ val window = Window.partitionBy($"key").orderBy($"value".desc)
+
+ checkAnswer(
+ df.select(
+ $"key",
+ lead("value", -1).over(window),
+ lag("value", -1).over(window)),
+ Row(1, null, "1") :: Row(1, "3", null) :: Row(2, null, "2") ::
Row(2, "4", null) :: Nil)
+ }
+
+ test("lead/lag with default value") {
+ val default = "n/a"
+ val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4"), (2,
"5")).toDF("key", "value")
+ val window = Window.partitionBy($"key").orderBy($"value")
+
+ checkAnswer(
+ df.select(
+ $"key",
+ lead("value", 2, default).over(window),
+ lag("value", 2, default).over(window),
+ lead("value", -2, default).over(window),
+ lag("value", -2, default).over(window)),
+ Row(1, default, default, default, default) :: Row(1, default,
default, default, default) ::
+ Row(2, "5", default, default, "5") :: Row(2, default, "2", "2",
default) ::
+ Row(2, default, default, default, default) :: Nil)
+ }
+
+ test("rows/range between with empty data frame") {
+ val df = Seq.empty[(String, Int)].toDF("key", "value")
+ val window = Window.partitionBy($"key").orderBy($"value")
+
+ checkAnswer(
+ df.select(
+ 'key,
+ first("value").over(
+ window.rowsBetween(Window.unboundedPreceding,
Window.unboundedFollowing)),
+ first("value").over(
+ window.rangeBetween(Window.unboundedPreceding,
Window.unboundedFollowing))),
+ Nil)
+ }
+
+ test("rows between should accept int/long values as boundary") {
+ val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L,
"1"), (2147483650L, "2"))
+ .toDF("key", "value")
+
+ checkAnswer(
+ df.select(
+ $"key",
+ count("key").over(
+ Window.partitionBy($"value").orderBy($"key").rowsBetween(0,
2147483647))),
+ Seq(Row(1, 3), Row(1, 4), Row(2, 2), Row(3, 2), Row(2147483650L, 1),
Row(2147483650L, 1))
+ )
+
+ val e = intercept[AnalysisException](
+ df.select(
+ $"key",
+ count("key").over(
+ Window.partitionBy($"value").orderBy($"key").rowsBetween(0,
2147483648L))))
+ assert(e.message.contains("Boundary end is not a valid integer:
2147483648"))
+ }
+
+ test("range between should accept at most one ORDER BY expression when
unbounded") {
--- End diff --
"unbounded" -> "bounded"
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]