Github user amitsela commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17179#discussion_r105395680
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala ---
    @@ -61,25 +65,49 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
      *  - After that, if `update(newState)` is called, then `exists()` will 
again return `true`,
      *    `get()` and `getOption()`will return the updated value.
      *
    + * Important points to note about using `KeyedStateTimeout`.
    + *  - The timeout type is a global param across all the keys (set as 
`timeout` param in
    + *    `[map|flatMap]GroupsWithState`, but the exact timeout duration is 
configurable per key
    + *    (by calling `setTimeout...()` in `KeyedState`).
    + *  - When the timeout occurs for a key, the function is called with no 
values, and
    + *    `KeyedState.isTimingOut()` set to true.
    + *  - The timeout is reset for key every time the function is called on 
the key, that is,
    + *    when the key has new data, or the key has timed out. So the user has 
to set the timeout
    + *    duration every time the function is called, otherwise there will not 
be any timeout set.
    + *  - Guarantees provided on processing-time-based timeout of key, when 
timeout duration is D ms:
    + *    - Timeout will never be called before real clock time has advanced 
by D ms
    + *    - Timeout will be called eventually when there is a trigger with any 
data in it
    --- End diff --
    
    Using a `ProcessingTime` timeout could be confusing this way - a user would 
expect to fire on timeout (+ system latency). Adding system latency, wether 
it's batch-long, or 10Xbatch-long (like with checkpointing in Spark 1.x for 
example) is still something pre-set that could also be configurable, giving 
users the control to do the trade-off between accuracy in firings vs. 
performance.
    Using am `EventTime` timeout in the future, I assume the "clock" would be 
watermark based instead of wall-time, and I see two use-cases where this would 
matter:
    1. Testing - being able to move the clock forward to end-of-time to force 
firing everything that still awaits the closing of windows.
    2. a pipeline where there is a filter before the stateful op. such that 
there is data, and the watermark advances, but some of the events are dropped 
and don't reach the stateful operator so it will hold off firing until the 
"proper" data (that passes filter) comes along - this again could cause an 
unknown delay to firing results.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to