Github user amitsela commented on a diff in the pull request:
https://github.com/apache/spark/pull/17179#discussion_r105395680
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala ---
@@ -61,25 +65,49 @@ import
org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
* - After that, if `update(newState)` is called, then `exists()` will
again return `true`,
* `get()` and `getOption()`will return the updated value.
*
+ * Important points to note about using `KeyedStateTimeout`.
+ * - The timeout type is a global param across all the keys (set as
`timeout` param in
+ * `[map|flatMap]GroupsWithState`, but the exact timeout duration is
configurable per key
+ * (by calling `setTimeout...()` in `KeyedState`).
+ * - When the timeout occurs for a key, the function is called with no
values, and
+ * `KeyedState.isTimingOut()` set to true.
+ * - The timeout is reset for key every time the function is called on
the key, that is,
+ * when the key has new data, or the key has timed out. So the user has
to set the timeout
+ * duration every time the function is called, otherwise there will not
be any timeout set.
+ * - Guarantees provided on processing-time-based timeout of key, when
timeout duration is D ms:
+ * - Timeout will never be called before real clock time has advanced
by D ms
+ * - Timeout will be called eventually when there is a trigger with any
data in it
--- End diff --
Using a `ProcessingTime` timeout could be confusing this way - a user would
expect to fire on timeout (+ system latency). Adding system latency, wether
it's batch-long, or 10Xbatch-long (like with checkpointing in Spark 1.x for
example) is still something pre-set that could also be configurable, giving
users the control to do the trade-off between accuracy in firings vs.
performance.
Using am `EventTime` timeout in the future, I assume the "clock" would be
watermark based instead of wall-time, and I see two use-cases where this would
matter:
1. Testing - being able to move the clock forward to end-of-time to force
firing everything that still awaits the closing of windows.
2. a pipeline where there is a filter before the stateful op. such that
there is data, and the watermark advances, but some of the events are dropped
and don't reach the stateful operator so it will hold off firing until the
"proper" data (that passes filter) comes along - this again could cause an
unknown delay to firing results.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]