Github user attilapiros commented on a diff in the pull request:
https://github.com/apache/spark/pull/20045#discussion_r158300159
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
---
@@ -32,6 +32,209 @@ import org.apache.spark.unsafe.types.CalendarInterval
class DataFrameWindowFunctionsSuite extends QueryTest with
SharedSQLContext {
import testImplicits._
+ test("Window partitionBy cardinality, no order by") {
+ val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 4)).toDF("key",
"value")
+
+ checkAnswer(
+ df.select(
+ sum("value").over(),
+ sum("value").over(Window.partitionBy("key")),
+ sum("value").over(Window.partitionBy("key", "value")),
+ sum("value").over(Window.partitionBy("value", "key"))),
+ Row(11, 3, 1, 1) :: Row(11, 3, 2, 2) :: Row(11, 8, 8, 8) :: Row(11,
8, 8, 8) :: Nil)
+ }
+
+ test("Null value in partition key") {
+ val df = Seq(("a", 1), ("a", 2), (null, 4), (null, 8)).toDF("key",
"value")
+
+ checkAnswer(
+ df.select(
+ 'value,
+ sum("value").over(Window.partitionBy("key"))),
+ Row(1, 3) :: Row(2, 3) :: Row(4, 12) :: Row(8, 12) :: Nil)
+ }
+
+ test("Same partitionBy multiple times") {
+ val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 8)).toDF("key",
"value")
+
+ checkAnswer(
+ df.select(
+ sum("value").over(Window.partitionBy("key", "key"))),
+ Row(3) :: Row(3) :: Row(12) :: Row(12) :: Nil)
+ }
+
+ test("Multiple orderBy clauses") {
+ val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x",
4)).toDF("k1", "k2", "v")
+
+ checkAnswer(
+ df.select(
+ 'v,
+ lead("v", 1).over(Window.orderBy("k1", "k2")),
+ lead("v", 1).over(Window.orderBy("k2", "k1"))),
+ Row(1, 2, 4) :: Row(4, 3, 2) :: Row(2, 4, 3) :: Row(3, null, null)
:: Nil)
+ }
+
+ test("Multiple orderBy clauses with desc") {
+ val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x",
4)).toDF("k1", "k2", "v")
+
+ checkAnswer(
+ df.select(
+ 'v,
+ lead("v", 1).over(Window.orderBy($"k1".desc, $"k2")),
+ lead("v", 1).over(Window.orderBy($"k1", $"k2".desc)),
+ lead("v", 1).over(Window.orderBy($"k1".desc, $"k2"))),
+ Row(1, 2, 3, 2) :: Row(2, null, 1, null) :: Row(3, 1, 4, 1) ::
Row(4, 3, null, 3) :: Nil)
+ }
+
+ test("Null values sorted to first by asc, last by desc ordering by
default") {
+ val df = Seq((null, 1), ("a", 2), ("b", 3)).toDF("key", "value")
+
+ checkAnswer(
+ df.select(
+ 'value,
+ lead("value", 1).over(Window.orderBy($"key")),
+ lead("value", 1).over(Window.orderBy($"key".desc))),
+ Row(1, 2, null) :: Row(2, 3, 1) :: Row(3, null, 2) :: Nil)
+ }
+
+ test("Ordering of null values can be explicitly controlled") {
+ val df = Seq((null, 1), ("a", 2), ("b", 3)).toDF("key", "value")
+
+ checkAnswer(
+ df.select(
+ 'value,
+ lead("value", 1).over(Window.orderBy($"key".asc_nulls_first)),
+ lead("value", 1).over(Window.orderBy($"key".asc_nulls_last)),
+ lead("value", 1).over(Window.orderBy($"key".desc_nulls_first)),
+ lead("value", 1).over(Window.orderBy($"key".desc_nulls_last))),
+ Row(1, 2, null, 3, null) :: Row(2, 3, 3, null, 1) :: Row(3, null, 1,
2, 2) :: Nil)
+ }
+
+
+ test("Order by without frame defaults to range between
unbounded_preceding - current_row") {
--- End diff --
I would propose using an easier aggregate function to follow this
behaviour: collect_list.
And using one more projection where
"rangeBetween(Window.unboundedPreceding, Window.currentRow)" is explicitly
given. Moreover desc direction can be tested too.
Like:
``` scala
test("Order by without frame defaults to range between unbounded_preceding
- current_row") {
val df = Seq(
("a", "p1", "1"),
("b", "p1", "2"),
("c", "p1", "2")).toDF("key", "partition", "value")
checkAnswer(
df.select(
$"key",
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value")),
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value")
.rangeBetween(Window.unboundedPreceding, Window.currentRow)),
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value".desc)),
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value".desc)
.rangeBetween(Window.unboundedPreceding, Window.currentRow))),
Seq(
Row("a", Array("1"), Array("1"), Array("2", "2", "1"), Array("2",
"2", "1")),
Row("b", Array("1", "2", "2"), Array("1", "2", "2"), Array("2",
"2"), Array("2", "2")),
Row("c", Array("1", "2", "2"), Array("1", "2", "2"), Array("2",
"2"), Array("2", "2"))))
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]