[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19495


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145290193
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 ---
@@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec(
 
   private val isTimeoutEnabled = timeoutConf != NoTimeout
   val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, 
isTimeoutEnabled)
+  val watermarkPresent = child.output.exists {
--- End diff --

Correction. No it is not. When watermark is not defined in the query, the 
eventTimeWatermark value is  Some(0)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145285750
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
   /**
* Set the timeout duration in ms for this key.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(durationMs: Long): Unit
 
+
   /**
* Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(duration: String): Unit
 
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
+  @throws[IllegalArgumentException](
+"if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
+  @throws[UnsupportedOperationException](
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutTimestamp(timestampMs: Long): Unit
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be 
older than the
* current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no side ef

[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145283763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private(
 timeoutTimestamp = timestampMs
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
   }
 
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime)
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
   }
 
+  override def getCurrentWatermarkMs(): Long = {
+if (!watermarkPresent) {
+  throw new UnsupportedOperationException(
+"Cannot get event time watermark timestamp without enabling 
setting watermark before " +
--- End diff --

yes. agreed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145283802
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 ---
@@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec(
 
   private val isTimeoutEnabled = timeoutConf != NoTimeout
   val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, 
isTimeoutEnabled)
+  val watermarkPresent = child.output.exists {
--- End diff --

yes. agreed.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145283034
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -1086,4 +1181,24 @@ object FlatMapGroupsWithStateSuite {
 override def metrics: StateStoreMetrics = new 
StateStoreMetrics(map.size, 0, Map.empty)
 override def hasCommitted: Boolean = true
   }
+
+  def assertCanGetProcessingTime(predicate: => Boolean): Unit = {
+if (!predicate) throw new TestFailedException("Could not get 
processing time", 20)
--- End diff --

what is the `20`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145282388
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 ---
@@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec(
 
   private val isTimeoutEnabled = timeoutConf != NoTimeout
   val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, 
isTimeoutEnabled)
+  val watermarkPresent = child.output.exists {
--- End diff --

this is cleaner, but doesn't `eventTimeWatermark.isDefined` imply that the 
watermark is present?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145282515
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private(
 timeoutTimestamp = timestampMs
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
   }
 
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime)
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
   }
 
+  override def getCurrentWatermarkMs(): Long = {
+if (!watermarkPresent) {
+  throw new UnsupportedOperationException(
+"Cannot get event time watermark timestamp without enabling 
setting watermark before " +
--- End diff --

`without enabling setting watermark` sounds too convoluted. You probably 
meant `without setting watermark`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145282877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
   /**
* Set the timeout duration in ms for this key.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(durationMs: Long): Unit
 
+
   /**
* Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(duration: String): Unit
 
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
+  @throws[IllegalArgumentException](
+"if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
+  @throws[UnsupportedOperationException](
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutTimestamp(timestampMs: Long): Unit
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be 
older than the
* current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no side 

[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145202164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
   /**
* Set the timeout duration in ms for this key.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(durationMs: Long): Unit
 
+
   /**
* Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(duration: String): Unit
 
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
+  @throws[IllegalArgumentException](
+"if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
+  @throws[UnsupportedOperationException](
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutTimestamp(timestampMs: Long): Unit
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be 
older than the
* current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no side ef

[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r145202185
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
--- End diff --

thats intentional. visually makes it easier to read. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144991277
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 }
   }
 
+  test("GroupState - getCurrentWatermarkMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get event time watermark timestamp without enabling event 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, 1000, watermark, 
timeoutConf, hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentWatermarkMs() }
+assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 
1000).getCurrentWatermarkMs() }
+assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() 
=== 1000)
+assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() 
=== 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentWatermarkMs() }
+assertWrongTimeoutError { 
batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
+assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
+  }
+
+  test("GroupState - getCurrentProcessingTimeMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get processing time timestamp without enabling processing 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, 
hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentProcessingTimeMs() }
+assertWrongTimeoutError { streamingState(EventTimeTimeout, 
1000).getCurrentProcessingTimeMs() }
+assert(streamingState(ProcessingTimeTimeout, 
1000).getCurrentProcessingTimeMs() === 1000)
+assert(streamingState(ProcessingTimeTimeout, 
2000).getCurrentProcessingTimeMs() === 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentProcessingTimeMs() }
--- End diff --

rgh. my bad. i didnt realize your comment was about the comment. my bad.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144992379
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
   /**
* Set the timeout duration in ms for this key.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(durationMs: Long): Unit
 
+
   /**
* Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(duration: String): Unit
 
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
+  @throws[IllegalArgumentException](
+"if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
+  @throws[UnsupportedOperationException](
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutTimestamp(timestampMs: Long): Unit
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be 
older than the
* current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no side 

[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144992332
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
--- End diff --

nit: extra line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144992239
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
 timeoutTimestamp = timestampMs
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
   }
 
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime)
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
   }
 
+  override def getCurrentWatermarkMs(): Long = {
+if (timeoutConf != EventTimeTimeout) {
+  throw new UnsupportedOperationException(
+"Cannot get event time watermark timestamp without enabling event 
time timeout in " +
+  "[map|flatMap]GroupsWithState")
+}
+eventTimeWatermarkMs
+  }
+
+  override def getCurrentProcessingTimeMs(): Long = {
+if (timeoutConf != ProcessingTimeTimeout) {
+  throw new UnsupportedOperationException(
+"Cannot get processing time timestamp without enabling processing 
time timeout in " +
+  "map|flatMap]GroupsWithState")
--- End diff --

nit: `[`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144992271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -187,7 +190,7 @@ private[sql] class GroupStateImpl[S] private(
 if (timeoutConf != EventTimeTimeout) {
   throw new UnsupportedOperationException(
 "Cannot set timeout timestamp without enabling event time timeout 
in " +
-  "map/flatMapGroupsWithState")
+  "map|flatMapGroupsWithState")
--- End diff --

`[map|flatMap]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144989072
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 }
   }
 
+  test("GroupState - getCurrentWatermarkMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get event time watermark timestamp without enabling event 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, 1000, watermark, 
timeoutConf, hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentWatermarkMs() }
+assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 
1000).getCurrentWatermarkMs() }
+assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() 
=== 1000)
+assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() 
=== 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentWatermarkMs() }
+assertWrongTimeoutError { 
batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
+assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
+  }
+
+  test("GroupState - getCurrentProcessingTimeMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get processing time timestamp without enabling processing 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, 
hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentProcessingTimeMs() }
+assertWrongTimeoutError { streamingState(EventTimeTimeout, 
1000).getCurrentProcessingTimeMs() }
+assert(streamingState(ProcessingTimeTimeout, 
1000).getCurrentProcessingTimeMs() === 1000)
+assert(streamingState(ProcessingTimeTimeout, 
2000).getCurrentProcessingTimeMs() === 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentProcessingTimeMs() }
--- End diff --

the comment above says otherwise. that's why I was confused


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144972393
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 }
   }
 
+  test("GroupState - getCurrentWatermarkMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get event time watermark timestamp without enabling event 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, 1000, watermark, 
timeoutConf, hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentWatermarkMs() }
+assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 
1000).getCurrentWatermarkMs() }
+assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() 
=== 1000)
+assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() 
=== 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentWatermarkMs() }
+assertWrongTimeoutError { 
batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
+assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
+  }
+
+  test("GroupState - getCurrentProcessingTimeMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get processing time timestamp without enabling processing 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, 
hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentProcessingTimeMs() }
+assertWrongTimeoutError { streamingState(EventTimeTimeout, 
1000).getCurrentProcessingTimeMs() }
+assert(streamingState(ProcessingTimeTimeout, 
1000).getCurrentProcessingTimeMs() === 1000)
+assert(streamingState(ProcessingTimeTimeout, 
2000).getCurrentProcessingTimeMs() === 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentProcessingTimeMs() }
--- End diff --

this is testing `getCurrentProcessingTimeMs`, so yeah, thats by design


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144972248
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
   /**
* Set the timeout duration in ms for this key.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(durationMs: Long): Unit
 
+
   /**
* Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(duration: String): Unit
 
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
+  @throws[IllegalArgumentException](
+"if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
+  @throws[UnsupportedOperationException](
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutTimestamp(timestampMs: Long): Unit
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be 
older than the
* current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no side ef

[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144963002
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
 timeoutTimestamp = timestampMs
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
   }
 
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime)
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
   }
 
+  override def getCurrentWatermarkMs(): Long = {
+if (timeoutConf != EventTimeTimeout) {
+  throw new UnsupportedOperationException(
+"Cannot get event time watermark timestamp without enabling event 
time timeout in " +
+  "[map/flatMap]GroupsWithState")
--- End diff --

right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144936080
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
 timeoutTimestamp = timestampMs
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
   }
 
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime)
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
   }
 
+  override def getCurrentWatermarkMs(): Long = {
+if (timeoutConf != EventTimeTimeout) {
+  throw new UnsupportedOperationException(
+"Cannot get event time watermark timestamp without enabling event 
time timeout in " +
+  "[map/flatMap]GroupsWithState")
--- End diff --

uber nit: should we be consistent with `/` and `|` half the places we use 
`[map/flatMap]` and in the other half there is `[map|flatMap]`. I prefer `|`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144936653
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 }
   }
 
+  test("GroupState - getCurrentWatermarkMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get event time watermark timestamp without enabling event 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, 1000, watermark, 
timeoutConf, hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentWatermarkMs() }
+assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 
1000).getCurrentWatermarkMs() }
+assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() 
=== 1000)
+assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() 
=== 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentWatermarkMs() }
+assertWrongTimeoutError { 
batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
+assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
+  }
+
+  test("GroupState - getCurrentProcessingTimeMs") {
+def assertWrongTimeoutError(test: => Unit): Unit = {
+  val e = intercept[UnsupportedOperationException] { test }
+  assert(e.getMessage.contains(
+"Cannot get processing time timestamp without enabling processing 
time timeout"))
+}
+
+def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): 
GroupState[Int] = {
+  GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, 
hasTimedOut = false)
+}
+
+def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
+  GroupStateImpl.createForBatch(timeoutConf)
+}
+
+// Tests for getCurrentWatermarkMs in streaming queries
+assertWrongTimeoutError { streamingState(NoTimeout, 
1000).getCurrentProcessingTimeMs() }
+assertWrongTimeoutError { streamingState(EventTimeTimeout, 
1000).getCurrentProcessingTimeMs() }
+assert(streamingState(ProcessingTimeTimeout, 
1000).getCurrentProcessingTimeMs() === 1000)
+assert(streamingState(ProcessingTimeTimeout, 
2000).getCurrentProcessingTimeMs() === 2000)
+
+// Tests for getCurrentWatermarkMs in batch queries
+assertWrongTimeoutError { 
batchState(NoTimeout).getCurrentProcessingTimeMs() }
--- End diff --

not actually using `getCurrentWatermarkMs`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-16 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144935264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
@@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
   /** Get the state value as a scala Option. */
   def getOption: Option[S]
 
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, 
and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
+  /** Update the value of the state. */
   def update(newState: S): Unit
 
   /** Remove this state. */
   def remove(): Unit
 
   /**
* Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note This can return true only when timeouts are enabled in 
`[map/flatmap]GroupsWithState`.
*/
   def hasTimedOut: Boolean
 
+
   /**
* Set the timeout duration in ms for this key.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(durationMs: Long): Unit
 
+
   /**
* Set the timeout duration for this key as a string. For example, "1 
hour", "2 days", etc.
*
-   * @note ProcessingTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
   @throws[IllegalArgumentException]("if 'duration' is not a valid 
duration")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
   @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutDuration(duration: String): Unit
 
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time.
* This timestamp cannot be older than the current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no effect when used in a batch query.
*/
+  @throws[IllegalArgumentException](
+"if 'timestampMs' is not positive or less than the current watermark 
in a streaming query")
+  @throws[UnsupportedOperationException](
+"if processing time timeout has not been enabled in 
[map|flatMap]GroupsWithState")
   def setTimeoutTimestamp(timestampMs: Long): Unit
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
+
   /**
* Set the timeout timestamp for this key as milliseconds in epoch time 
and an additional
* duration as a string (e.g. "1 hour", "2 days", etc.).
* The final timestamp (including the additional duration) cannot be 
older than the
* current watermark.
*
-   * @note EventTimeTimeout must be enabled in 
`[map/flatmap]GroupsWithStates`.
+   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
+   *   `[map/flatmap]GroupsWithState` for calling this method.
+   * @note This method has no side 

[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-15 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19495#discussion_r144736325
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 ---
@@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
 timeoutTimestamp = timestampMs
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
   }
 
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime)
   }
 
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or 
already removed")
-  @throws[UnsupportedOperationException](
-"if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in 
a streaming query")
   override def setTimeoutTimestamp(timestamp: Date, additionalDuration: 
String): Unit = {
 checkTimeoutTimestampAllowed()
 setTimeoutTimestamp(timestamp.getTime + 
parseDuration(additionalDuration))
   }
 
+  override def getCurrentWatermarkMs(): Long = {
+if (timeoutConf != EventTimeTimeout) {
+  throw new UnsupportedOperationException(
+"Cannot get event time watermark timestamp without enabling event 
time timeout in " +
+  "[map/flatMap]GroupsWithState")
+}
+eventTimeWatermarkMs
+  }
+
+  override def getCurrentProcessingTimeMs(): Long = {
+if (timeoutConf != ProcessingTimeTimeout) {
+  throw new UnsupportedOperationException(
+"Cannot get processing time timestamp without enabling processing 
time timeout in " +
+  "map/flatMap]GroupsWithState")
--- End diff --

`map/flatMap]` -> `[map/flatMap]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

2017-10-13 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/19495

[SPARK-22278][SS] Expose current event time watermark and current 
processing time in GroupState

## What changes were proposed in this pull request?

Complex state-updating and/or timeout-handling logic in mapGroupsWithState 
functions may require taking decisions based on the current event-time 
watermark and/or processing time. Currently, you can use the SQL function 
`current_timestamp` to get the current processing time, but it needs to be 
passed inserted in every row with a select, and then passed through the 
encoder, which isn't efficient. Furthermore, there is no way to get the current 
watermark.

This PR exposes both of them through the GroupState API. 
Additionally, it also cleans up some of the GroupState docs. 

## How was this patch tested?

New unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-22278

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19495


commit c9a042e2f0228584f6a3f643cfac412c73ed98d7
Author: Tathagata Das 
Date:   2017-10-10T00:01:02Z

Expose event time watermark in the GorupState

commit 67114ab59f5a8d79fbe66b7deb93869f656346b9
Author: Tathagata Das 
Date:   2017-10-14T00:16:08Z

Exposed processing time




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org