Hi Raman,

I think you would need a sliding count window of size 2 with slide 1.
This is basically a GlobalWindow with a special trigger.

However, you would need to modify the custom trigger to be able to
- identify a terminal event (if there is such a thing) or to
- close the window after a certain period of inactivity to clean up the
state.

Best, Fabian

2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketra...@gmail.com>:

> Thank you for your reply.
>
> If I were to use a keyed stream with a count-based window of 2, would
> Flink keep the last state persistently until the next state is
> received? Would this be another way of having Flink keep this
> information persistently without having to implement it manually?
>
> Thanks,
> Raman
>
> On 18/01/17 11:22 AM, Fabian Hueske wrote:
> > Hi Raman,
> >
> > I would approach this issues as follows.
> >
> > You key the input stream on the sourceId and apply a stateful
> > FlatMapFunction.
> > The FlatMapFunction has a key-partioned state and stores for each key
> > (sourceId) the latest event as state.
> > When a new event arrives, you can compute the time spend in the last
> > state by looking up the event from the state and the latest received
> > event.
> > Then you put the new event in the state.
> >
> > This solution works well if you have a finite number of sources or if
> > you have an terminal event that signals that no more events will
> > arrive for a key.
> > Otherwise, the number of events stored in the state will grow
> > infinitely and eventually become a problem.
> >
> > If the  number of sources increases, you need to evict data at some
> > point in time. A ProcessFunction can help here, because you can
> > register a timer which
> > you can use to evict up old state.
> >
> > Hope this helps,
> > Fabian
> >
> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketra...@gmail.com
> > <mailto:rocketra...@gmail.com>>:
> >
> >     I am investigating Flink. I am considering a relatively simple use
> >     case -- I want to ingest streams of events that are essentially
> >     timestamped state changes. These events may look something like:
> >
> >     {
> >       sourceId: 111,
> >       state: OPEN,
> >       timestamp: <date/time>
> >     }
> >
> >     I want to apply various processing to these state change events, the
> >     output of which can be used for analytics. For example:
> >
> >     1. average time spent in state, by state
> >     2. sources with longest (or shortest) time spent in OPEN state
> >
> >     The time spent in each state may be days or even weeks.
> >
> >     All the examples I have seen of similar logic involve windows on the
> >     order of 15 minutes. Since time spent in each state may far exceed
> >     these window sizes, I'm wondering what the best approach will be.
> >
> >     One thought from reading the docs is to use `every` to operate on the
> >     entire stream. But it seems like this will take longer and longer to
> >     run as the event stream grows, so this is not an ideal solution. Or
> >     does Flink apply some clever optimizations to avoid the potential
> >     performance issue?
> >
> >     Another thought was to split the event stream into multiple streams
> by
> >     source, each of which will have a small (and limited) amount of data.
> >     This will make processing each stream simpler, but since there can be
> >     thousands of sources, it will result in a lot of streams to handle
> and
> >     persist (probably in Kafka). This does not seem ideal either.
> >
> >     It seems like this should be simple, but I'm struggling with
> >     understanding how to solve it elegantly.
> >
> >     Regards,
> >     Raman
> >
> >
>

Reply via email to