Thank you Fabian, the blog articles were very useful. I will continue experimenting.
On Thu, Jan 19, 2017 at 3:36 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Raman, > > Checkpoints are used to recover from task or process failures and usually > automatically taken at periodic intervals if configured correctly. > Checkpoints are usually removed when a more recent checkpoint is completed > (the exact policy can be configured). > > Savepoints are used to restart a job that was previously shutdown, to > migrate a job to another cluster (e.g., when upgrading Flink), updating the > job itself etc. So more for planned maintenance. > Nonetheless they can also be used for more coarse-grained fault tolerance > and it is a common practice to periodically trigger a savepoint. > > These blog posts might be helpful to understand the potential of savepoints > [1] [2]. > > Best, Fabian > > [1] http://data-artisans.com/turning-back-time-savepoints/ > [2] http://data-artisans.com/savepoints-part-2-updating-applications/ > > 2017-01-19 19:02 GMT+01:00 Raman Gupta <rocketra...@gmail.com>: >> >> I was able to get it working well with the original approach you >> described. Thanks! Note that the documentation on how to do this with the >> Java API is... sparse, to say the least. I was able to look at the >> implementation of the scala flatMapWithState function as a starting point. >> >> Now I'm trying to understand all the operational concerns related to the >> stored state. My checkpoints are in rocksdb configured via the job >> definition. >> >> It seems that the checkpointed state of the streaming job is lost when I >> stop and restart flink normally, or Flink terminates abnormally and is >> restarted. I was able to take an explicit savepoint and then restart the job >> with it. >> >> Is the correct approach as of now to take savepoints periodically via >> cron, and use those to re-run jobs in case of flink failure or restart? >> >> Regards, >> Raman >> >> On 19/01/17 05:43 AM, Fabian Hueske wrote: >> >> Hi Raman, >> >> I think you would need a sliding count window of size 2 with slide 1. >> This is basically a GlobalWindow with a special trigger. >> >> However, you would need to modify the custom trigger to be able to >> - identify a terminal event (if there is such a thing) or to >> - close the window after a certain period of inactivity to clean up the >> state. >> >> Best, Fabian >> >> 2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketra...@gmail.com>: >>> >>> Thank you for your reply. >>> >>> If I were to use a keyed stream with a count-based window of 2, would >>> Flink keep the last state persistently until the next state is >>> received? Would this be another way of having Flink keep this >>> information persistently without having to implement it manually? >>> >>> Thanks, >>> Raman >>> >>> On 18/01/17 11:22 AM, Fabian Hueske wrote: >>> > Hi Raman, >>> > >>> > I would approach this issues as follows. >>> > >>> > You key the input stream on the sourceId and apply a stateful >>> > FlatMapFunction. >>> > The FlatMapFunction has a key-partioned state and stores for each key >>> > (sourceId) the latest event as state. >>> > When a new event arrives, you can compute the time spend in the last >>> > state by looking up the event from the state and the latest received >>> > event. >>> > Then you put the new event in the state. >>> > >>> > This solution works well if you have a finite number of sources or if >>> > you have an terminal event that signals that no more events will >>> > arrive for a key. >>> > Otherwise, the number of events stored in the state will grow >>> > infinitely and eventually become a problem. >>> > >>> > If the number of sources increases, you need to evict data at some >>> > point in time. A ProcessFunction can help here, because you can >>> > register a timer which >>> > you can use to evict up old state. >>> > >>> > Hope this helps, >>> > Fabian >>> > >>> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketra...@gmail.com >>> > <mailto:rocketra...@gmail.com>>: >>> > >>> > I am investigating Flink. I am considering a relatively simple use >>> > case -- I want to ingest streams of events that are essentially >>> > timestamped state changes. These events may look something like: >>> > >>> > { >>> > sourceId: 111, >>> > state: OPEN, >>> > timestamp: <date/time> >>> > } >>> > >>> > I want to apply various processing to these state change events, >>> > the >>> > output of which can be used for analytics. For example: >>> > >>> > 1. average time spent in state, by state >>> > 2. sources with longest (or shortest) time spent in OPEN state >>> > >>> > The time spent in each state may be days or even weeks. >>> > >>> > All the examples I have seen of similar logic involve windows on >>> > the >>> > order of 15 minutes. Since time spent in each state may far exceed >>> > these window sizes, I'm wondering what the best approach will be. >>> > >>> > One thought from reading the docs is to use `every` to operate on >>> > the >>> > entire stream. But it seems like this will take longer and longer >>> > to >>> > run as the event stream grows, so this is not an ideal solution. Or >>> > does Flink apply some clever optimizations to avoid the potential >>> > performance issue? >>> > >>> > Another thought was to split the event stream into multiple streams >>> > by >>> > source, each of which will have a small (and limited) amount of >>> > data. >>> > This will make processing each stream simpler, but since there can >>> > be >>> > thousands of sources, it will result in a lot of streams to handle >>> > and >>> > persist (probably in Kafka). This does not seem ideal either. >>> > >>> > It seems like this should be simple, but I'm struggling with >>> > understanding how to solve it elegantly. >>> > >>> > Regards, >>> > Raman >>> > >>> > >> >> >> >