Hi Raman, I think you would need a sliding count window of size 2 with slide 1. This is basically a GlobalWindow with a special trigger.
However, you would need to modify the custom trigger to be able to - identify a terminal event (if there is such a thing) or to - close the window after a certain period of inactivity to clean up the state. Best, Fabian 2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketra...@gmail.com>: > Thank you for your reply. > > If I were to use a keyed stream with a count-based window of 2, would > Flink keep the last state persistently until the next state is > received? Would this be another way of having Flink keep this > information persistently without having to implement it manually? > > Thanks, > Raman > > On 18/01/17 11:22 AM, Fabian Hueske wrote: > > Hi Raman, > > > > I would approach this issues as follows. > > > > You key the input stream on the sourceId and apply a stateful > > FlatMapFunction. > > The FlatMapFunction has a key-partioned state and stores for each key > > (sourceId) the latest event as state. > > When a new event arrives, you can compute the time spend in the last > > state by looking up the event from the state and the latest received > > event. > > Then you put the new event in the state. > > > > This solution works well if you have a finite number of sources or if > > you have an terminal event that signals that no more events will > > arrive for a key. > > Otherwise, the number of events stored in the state will grow > > infinitely and eventually become a problem. > > > > If the number of sources increases, you need to evict data at some > > point in time. A ProcessFunction can help here, because you can > > register a timer which > > you can use to evict up old state. > > > > Hope this helps, > > Fabian > > > > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketra...@gmail.com > > <mailto:rocketra...@gmail.com>>: > > > > I am investigating Flink. I am considering a relatively simple use > > case -- I want to ingest streams of events that are essentially > > timestamped state changes. These events may look something like: > > > > { > > sourceId: 111, > > state: OPEN, > > timestamp: <date/time> > > } > > > > I want to apply various processing to these state change events, the > > output of which can be used for analytics. For example: > > > > 1. average time spent in state, by state > > 2. sources with longest (or shortest) time spent in OPEN state > > > > The time spent in each state may be days or even weeks. > > > > All the examples I have seen of similar logic involve windows on the > > order of 15 minutes. Since time spent in each state may far exceed > > these window sizes, I'm wondering what the best approach will be. > > > > One thought from reading the docs is to use `every` to operate on the > > entire stream. But it seems like this will take longer and longer to > > run as the event stream grows, so this is not an ideal solution. Or > > does Flink apply some clever optimizations to avoid the potential > > performance issue? > > > > Another thought was to split the event stream into multiple streams > by > > source, each of which will have a small (and limited) amount of data. > > This will make processing each stream simpler, but since there can be > > thousands of sources, it will result in a lot of streams to handle > and > > persist (probably in Kafka). This does not seem ideal either. > > > > It seems like this should be simple, but I'm struggling with > > understanding how to solve it elegantly. > > > > Regards, > > Raman > > > > >