Github user amitsela commented on a diff in the pull request:
https://github.com/apache/spark/pull/17179#discussion_r105227799
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala ---
@@ -61,25 +65,49 @@ import
org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
* - After that, if `update(newState)` is called, then `exists()` will
again return `true`,
* `get()` and `getOption()`will return the updated value.
*
+ * Important points to note about using `KeyedStateTimeout`.
+ * - The timeout type is a global param across all the keys (set as
`timeout` param in
+ * `[map|flatMap]GroupsWithState`, but the exact timeout duration is
configurable per key
+ * (by calling `setTimeout...()` in `KeyedState`).
+ * - When the timeout occurs for a key, the function is called with no
values, and
+ * `KeyedState.isTimingOut()` set to true.
+ * - The timeout is reset for key every time the function is called on
the key, that is,
--- End diff --
If a timeout was previously set, and an update (new input value for a key)
comes, the user would still have to re-set the timeout ? why not leave it as it
was ?
I guess that even with this implementation, this could still be
sugar-coated by doing something like:
```
if (user not set timeout)
timeout = prevTimeout
```
no ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]