Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/19495#discussion_r144989072
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
---
@@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest with BeforeAndAf
}
}
+ test("GroupState - getCurrentWatermarkMs") {
+ def assertWrongTimeoutError(test: => Unit): Unit = {
+ val e = intercept[UnsupportedOperationException] { test }
+ assert(e.getMessage.contains(
+ "Cannot get event time watermark timestamp without enabling event
time timeout"))
+ }
+
+ def streamingState(timeoutConf: GroupStateTimeout, watermark: Long):
GroupState[Int] = {
+ GroupStateImpl.createForStreaming(None, 1000, watermark,
timeoutConf, hasTimedOut = false)
+ }
+
+ def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+ GroupStateImpl.createForBatch(timeoutConf)
+ }
+
+ // Tests for getCurrentWatermarkMs in streaming queries
+ assertWrongTimeoutError { streamingState(NoTimeout,
1000).getCurrentWatermarkMs() }
+ assertWrongTimeoutError { streamingState(ProcessingTimeTimeout,
1000).getCurrentWatermarkMs() }
+ assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs()
=== 1000)
+ assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs()
=== 2000)
+
+ // Tests for getCurrentWatermarkMs in batch queries
+ assertWrongTimeoutError {
batchState(NoTimeout).getCurrentWatermarkMs() }
+ assertWrongTimeoutError {
batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
+ assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
+ }
+
+ test("GroupState - getCurrentProcessingTimeMs") {
+ def assertWrongTimeoutError(test: => Unit): Unit = {
+ val e = intercept[UnsupportedOperationException] { test }
+ assert(e.getMessage.contains(
+ "Cannot get processing time timestamp without enabling processing
time timeout"))
+ }
+
+ def streamingState(timeoutConf: GroupStateTimeout, procTime: Long):
GroupState[Int] = {
+ GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf,
hasTimedOut = false)
+ }
+
+ def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+ GroupStateImpl.createForBatch(timeoutConf)
+ }
+
+ // Tests for getCurrentWatermarkMs in streaming queries
+ assertWrongTimeoutError { streamingState(NoTimeout,
1000).getCurrentProcessingTimeMs() }
+ assertWrongTimeoutError { streamingState(EventTimeTimeout,
1000).getCurrentProcessingTimeMs() }
+ assert(streamingState(ProcessingTimeTimeout,
1000).getCurrentProcessingTimeMs() === 1000)
+ assert(streamingState(ProcessingTimeTimeout,
2000).getCurrentProcessingTimeMs() === 2000)
+
+ // Tests for getCurrentWatermarkMs in batch queries
+ assertWrongTimeoutError {
batchState(NoTimeout).getCurrentProcessingTimeMs() }
--- End diff --
the comment above says otherwise. that's why I was confused
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]