Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144989072
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
    @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
         }
       }
     
    +  test("GroupState - getCurrentWatermarkMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get event time watermark timestamp without enabling event 
time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): 
GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, 1000, watermark, 
timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 
1000).getCurrentWatermarkMs() }
    +    assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() 
=== 1000)
    +    assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() 
=== 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { 
batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
    +    assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
    +  }
    +
    +  test("GroupState - getCurrentProcessingTimeMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get processing time timestamp without enabling processing 
time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): 
GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, 
hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentProcessingTimeMs() }
    +    assertWrongTimeoutError { streamingState(EventTimeTimeout, 
1000).getCurrentProcessingTimeMs() }
    +    assert(streamingState(ProcessingTimeTimeout, 
1000).getCurrentProcessingTimeMs() === 1000)
    +    assert(streamingState(ProcessingTimeTimeout, 
2000).getCurrentProcessingTimeMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentProcessingTimeMs() }
    --- End diff --
    
    the comment above says otherwise. that's why I was confused


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to