Github user felixcheung commented on a diff in the pull request:
https://github.com/apache/spark/pull/19495#discussion_r144736325
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
---
@@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
timeoutTimestamp = timestampMs
}
- @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
- @throws[IllegalStateException]("when state is either not initialized, or
already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in
a streaming query")
override def setTimeoutTimestamp(timestampMs: Long, additionalDuration:
String): Unit = {
checkTimeoutTimestampAllowed()
setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
}
- @throws[IllegalStateException]("when state is either not initialized, or
already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in
a streaming query")
override def setTimeoutTimestamp(timestamp: Date): Unit = {
checkTimeoutTimestampAllowed()
setTimeoutTimestamp(timestamp.getTime)
}
- @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
- @throws[IllegalStateException]("when state is either not initialized, or
already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in
a streaming query")
override def setTimeoutTimestamp(timestamp: Date, additionalDuration:
String): Unit = {
checkTimeoutTimestampAllowed()
setTimeoutTimestamp(timestamp.getTime +
parseDuration(additionalDuration))
}
+ override def getCurrentWatermarkMs(): Long = {
+ if (timeoutConf != EventTimeTimeout) {
+ throw new UnsupportedOperationException(
+ "Cannot get event time watermark timestamp without enabling event
time timeout in " +
+ "[map/flatMap]GroupsWithState")
+ }
+ eventTimeWatermarkMs
+ }
+
+ override def getCurrentProcessingTimeMs(): Long = {
+ if (timeoutConf != ProcessingTimeTimeout) {
+ throw new UnsupportedOperationException(
+ "Cannot get processing time timestamp without enabling processing
time timeout in " +
+ "map/flatMap]GroupsWithState")
--- End diff --
`map/flatMap]` -> `[map/flatMap]`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]