Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20045#discussion_r158300159
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 ---
    @@ -32,6 +32,209 @@ import org.apache.spark.unsafe.types.CalendarInterval
     class DataFrameWindowFunctionsSuite extends QueryTest with 
SharedSQLContext {
       import testImplicits._
     
    +  test("Window partitionBy cardinality, no order by") {
    +    val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 4)).toDF("key", 
"value")
    +
    +    checkAnswer(
    +      df.select(
    +        sum("value").over(),
    +        sum("value").over(Window.partitionBy("key")),
    +        sum("value").over(Window.partitionBy("key", "value")),
    +        sum("value").over(Window.partitionBy("value", "key"))),
    +      Row(11, 3, 1, 1) :: Row(11, 3, 2, 2) :: Row(11, 8, 8, 8) :: Row(11, 
8, 8, 8) :: Nil)
    +  }
    +
    +  test("Null value in partition key") {
    +    val df = Seq(("a", 1), ("a", 2), (null, 4), (null, 8)).toDF("key", 
"value")
    +
    +    checkAnswer(
    +      df.select(
    +        'value,
    +        sum("value").over(Window.partitionBy("key"))),
    +      Row(1, 3) :: Row(2, 3) :: Row(4, 12) :: Row(8, 12) :: Nil)
    +  }
    +
    +  test("Same partitionBy multiple times") {
    +    val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 8)).toDF("key", 
"value")
    +
    +    checkAnswer(
    +      df.select(
    +        sum("value").over(Window.partitionBy("key", "key"))),
    +      Row(3) :: Row(3) :: Row(12) :: Row(12) :: Nil)
    +  }
    +
    +  test("Multiple orderBy clauses") {
    +    val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x", 
4)).toDF("k1", "k2", "v")
    +
    +    checkAnswer(
    +      df.select(
    +        'v,
    +        lead("v", 1).over(Window.orderBy("k1", "k2")),
    +        lead("v", 1).over(Window.orderBy("k2", "k1"))),
    +      Row(1, 2, 4) :: Row(4, 3, 2) :: Row(2, 4, 3) :: Row(3, null, null) 
:: Nil)
    +  }
    +
    +  test("Multiple orderBy clauses with desc") {
    +    val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x", 
4)).toDF("k1", "k2", "v")
    +
    +    checkAnswer(
    +      df.select(
    +        'v,
    +        lead("v", 1).over(Window.orderBy($"k1".desc, $"k2")),
    +        lead("v", 1).over(Window.orderBy($"k1", $"k2".desc)),
    +        lead("v", 1).over(Window.orderBy($"k1".desc, $"k2"))),
    +      Row(1, 2, 3, 2) :: Row(2, null, 1, null) :: Row(3, 1, 4, 1) :: 
Row(4, 3, null, 3) :: Nil)
    +  }
    +
    +  test("Null values sorted to first by asc, last by desc ordering by 
default") {
    +    val df = Seq((null, 1), ("a", 2), ("b", 3)).toDF("key", "value")
    +
    +    checkAnswer(
    +      df.select(
    +        'value,
    +        lead("value", 1).over(Window.orderBy($"key")),
    +        lead("value", 1).over(Window.orderBy($"key".desc))),
    +      Row(1, 2, null) :: Row(2, 3, 1) :: Row(3, null, 2) :: Nil)
    +  }
    +
    +  test("Ordering of null values can be explicitly controlled") {
    +    val df = Seq((null, 1), ("a", 2), ("b", 3)).toDF("key", "value")
    +
    +    checkAnswer(
    +      df.select(
    +        'value,
    +        lead("value", 1).over(Window.orderBy($"key".asc_nulls_first)),
    +        lead("value", 1).over(Window.orderBy($"key".asc_nulls_last)),
    +        lead("value", 1).over(Window.orderBy($"key".desc_nulls_first)),
    +        lead("value", 1).over(Window.orderBy($"key".desc_nulls_last))),
    +      Row(1, 2, null, 3, null) :: Row(2, 3, 3, null, 1) :: Row(3, null, 1, 
2, 2) :: Nil)
    +  }
    +
    +
    +  test("Order by without frame defaults to range between 
unbounded_preceding - current_row") {
    --- End diff --
    
    I would propose using an easier aggregate function to follow this 
behaviour: collect_list.
    And using one more projection where 
"rangeBetween(Window.unboundedPreceding, Window.currentRow)" is explicitly 
given. Moreover desc direction can be tested too. 
    
    Like:
    ``` scala
    test("Order by without frame defaults to range between unbounded_preceding 
- current_row") {
        val df = Seq(
          ("a", "p1", "1"),
          ("b", "p1", "2"),
          ("c", "p1", "2")).toDF("key", "partition", "value")
        checkAnswer(
          df.select(
            $"key",
            
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value")),
            
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value")
              .rangeBetween(Window.unboundedPreceding, Window.currentRow)),
            
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value".desc)),
            
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value".desc)
              .rangeBetween(Window.unboundedPreceding, Window.currentRow))),
          Seq(
            Row("a", Array("1"), Array("1"), Array("2", "2", "1"), Array("2", 
"2", "1")),
            Row("b", Array("1", "2", "2"), Array("1", "2", "2"), Array("2", 
"2"), Array("2", "2")),
            Row("c", Array("1", "2", "2"), Array("1", "2", "2"), Array("2", 
"2"), Array("2", "2"))))
      }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to