Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17179#discussion_r105989084
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala ---
    @@ -61,25 +65,50 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
      *  - After that, if `update(newState)` is called, then `exists()` will 
again return `true`,
      *    `get()` and `getOption()`will return the updated value.
      *
    + * Important points to note about using `KeyedStateTimeout`.
    + *  - The timeout type is a global param across all the keys (set as 
`timeout` param in
    + *    `[map|flatMap]GroupsWithState`, but the exact timeout duration is 
configurable per key
    + *    (by calling `setTimeout...()` in `KeyedState`).
    + *  - When the timeout occurs for a key, the function is called with no 
values, and
    + *    `KeyedState.isTimingOut()` set to true.
    + *  - The timeout is reset for key every time the function is called on 
the key, that is,
    + *    when the key has new data, or the key has timed out. So the user has 
to set the timeout
    + *    duration every time the function is called, otherwise there will not 
be any timeout set.
    + *  - Guarantees provided on processing-time-based timeout of key, when 
timeout duration is D ms:
    + *    - Timeout will never be called before real clock time has advanced 
by D ms
    + *    - Timeout will be called eventually when there is a trigger in the 
query
    + *      (i.e. after D ms). So there is a no strict upper bound on when the 
timeout would occur.
    + *      For example, the trigger interval of the query will affect when 
the timeout is actually hit.
    + *      If there is no data in the stream (for any key) for a while, then 
their will not be
    + *      any trigger and timeout will not be hit until there is data.
    --- End diff --
    
    Shouldnt be tough. We will have to occasionally and periodically run a 
trigger with an empty batch DF. I think that should be a separate PR as that 
touch the StreamExecution, and we need figure out what the policy and the APIs 
to specify the policy should be. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to