Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/17179#discussion_r105991219
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala ---
@@ -61,25 +65,50 @@ import
org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
* - After that, if `update(newState)` is called, then `exists()` will
again return `true`,
* `get()` and `getOption()`will return the updated value.
*
+ * Important points to note about using `KeyedStateTimeout`.
+ * - The timeout type is a global param across all the keys (set as
`timeout` param in
+ * `[map|flatMap]GroupsWithState`, but the exact timeout duration is
configurable per key
+ * (by calling `setTimeout...()` in `KeyedState`).
+ * - When the timeout occurs for a key, the function is called with no
values, and
+ * `KeyedState.isTimingOut()` set to true.
+ * - The timeout is reset for key every time the function is called on
the key, that is,
+ * when the key has new data, or the key has timed out. So the user has
to set the timeout
+ * duration every time the function is called, otherwise there will not
be any timeout set.
+ * - Guarantees provided on processing-time-based timeout of key, when
timeout duration is D ms:
+ * - Timeout will never be called before real clock time has advanced
by D ms
+ * - Timeout will be called eventually when there is a trigger in the
query
+ * (i.e. after D ms). So there is a no strict upper bound on when the
timeout would occur.
+ * For example, the trigger interval of the query will affect when
the timeout is actually hit.
+ * If there is no data in the stream (for any key) for a while, then
their will not be
+ * any trigger and timeout will not be hit until there is data.
+ *
* Scala example of using KeyedState in `mapGroupsWithState`:
* {{{
* // A mapping function that maintains an integer state for string keys
and returns a string.
* def mappingFunction(key: String, value: Iterator[Int], state:
KeyedState[Int]): String = {
- * // Check if state exists
- * if (state.exists) {
- * val existingState = state.get // Get the existing state
- * val shouldRemove = ... // Decide whether to remove the state
+ *
+ * if (state.isTimingOut) { // If called when timing out,
remove the state
+ * state.remove()
+ *
+ * } else if (state.exists) { // If state exists, use it
for processing
+ * val existingState = state.get // Get the existing state
+ * val shouldRemove = ... // Decide whether to remove
the state
* if (shouldRemove) {
- * state.remove() // Remove the state
+ * state.remove() // Remove the state
+ *
* } else {
* val newState = ...
- * state.update(newState) // Set the new state
+ * state.update(newState) // Set the new state
* }
+ *
* } else {
* val initialState = ...
- * state.update(initialState) // Set the initial state
+ * state.update(initialState) // Set the initial state
* }
- * ... // return something
+ * state.setTimeoutDuration("1 hour") // Set the timeout
--- End diff --
Hmm, I wonder if we should throw an exception here. I was pretty confused
what this function would do just looking at it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]