Thank you Fabian, the blog articles were very useful. I will continue
experimenting.

On Thu, Jan 19, 2017 at 3:36 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> Hi Raman,
>
> Checkpoints are used to recover from task or process failures and usually
> automatically taken at periodic intervals if configured correctly.
> Checkpoints are usually removed when a more recent checkpoint is completed
> (the exact policy can be configured).
>
> Savepoints are used to restart a job that was previously shutdown, to
> migrate a job to another cluster (e.g., when upgrading Flink), updating the
> job itself etc. So more for planned maintenance.
> Nonetheless they can also be used for more coarse-grained fault tolerance
> and it is a common practice to periodically trigger a savepoint.
>
> These blog posts might be helpful to understand the potential of savepoints
> [1] [2].
>
> Best, Fabian
>
> [1] http://data-artisans.com/turning-back-time-savepoints/
> [2] http://data-artisans.com/savepoints-part-2-updating-applications/
>
> 2017-01-19 19:02 GMT+01:00 Raman Gupta <rocketra...@gmail.com>:
>>
>> I was able to get it working well with the original approach you
>> described. Thanks! Note that the documentation on how to do this with the
>> Java API is... sparse, to say the least. I was able to look at the
>> implementation of the scala flatMapWithState function as a starting point.
>>
>> Now I'm trying to understand all the operational concerns related to the
>> stored state. My checkpoints are in rocksdb configured via the job
>> definition.
>>
>> It seems that the checkpointed state of the streaming job is lost when I
>> stop and restart flink normally, or Flink terminates abnormally and is
>> restarted. I was able to take an explicit savepoint and then restart the job
>> with it.
>>
>> Is the correct approach as of now to take savepoints periodically via
>> cron, and use those to re-run jobs in case of flink failure or restart?
>>
>> Regards,
>> Raman
>>
>> On 19/01/17 05:43 AM, Fabian Hueske wrote:
>>
>> Hi Raman,
>>
>> I think you would need a sliding count window of size 2 with slide 1.
>> This is basically a GlobalWindow with a special trigger.
>>
>> However, you would need to modify the custom trigger to be able to
>> - identify a terminal event (if there is such a thing) or to
>> - close the window after a certain period of inactivity to clean up the
>> state.
>>
>> Best, Fabian
>>
>> 2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketra...@gmail.com>:
>>>
>>> Thank you for your reply.
>>>
>>> If I were to use a keyed stream with a count-based window of 2, would
>>> Flink keep the last state persistently until the next state is
>>> received? Would this be another way of having Flink keep this
>>> information persistently without having to implement it manually?
>>>
>>> Thanks,
>>> Raman
>>>
>>> On 18/01/17 11:22 AM, Fabian Hueske wrote:
>>> > Hi Raman,
>>> >
>>> > I would approach this issues as follows.
>>> >
>>> > You key the input stream on the sourceId and apply a stateful
>>> > FlatMapFunction.
>>> > The FlatMapFunction has a key-partioned state and stores for each key
>>> > (sourceId) the latest event as state.
>>> > When a new event arrives, you can compute the time spend in the last
>>> > state by looking up the event from the state and the latest received
>>> > event.
>>> > Then you put the new event in the state.
>>> >
>>> > This solution works well if you have a finite number of sources or if
>>> > you have an terminal event that signals that no more events will
>>> > arrive for a key.
>>> > Otherwise, the number of events stored in the state will grow
>>> > infinitely and eventually become a problem.
>>> >
>>> > If the  number of sources increases, you need to evict data at some
>>> > point in time. A ProcessFunction can help here, because you can
>>> > register a timer which
>>> > you can use to evict up old state.
>>> >
>>> > Hope this helps,
>>> > Fabian
>>> >
>>> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketra...@gmail.com
>>> > <mailto:rocketra...@gmail.com>>:
>>> >
>>> >     I am investigating Flink. I am considering a relatively simple use
>>> >     case -- I want to ingest streams of events that are essentially
>>> >     timestamped state changes. These events may look something like:
>>> >
>>> >     {
>>> >       sourceId: 111,
>>> >       state: OPEN,
>>> >       timestamp: <date/time>
>>> >     }
>>> >
>>> >     I want to apply various processing to these state change events,
>>> > the
>>> >     output of which can be used for analytics. For example:
>>> >
>>> >     1. average time spent in state, by state
>>> >     2. sources with longest (or shortest) time spent in OPEN state
>>> >
>>> >     The time spent in each state may be days or even weeks.
>>> >
>>> >     All the examples I have seen of similar logic involve windows on
>>> > the
>>> >     order of 15 minutes. Since time spent in each state may far exceed
>>> >     these window sizes, I'm wondering what the best approach will be.
>>> >
>>> >     One thought from reading the docs is to use `every` to operate on
>>> > the
>>> >     entire stream. But it seems like this will take longer and longer
>>> > to
>>> >     run as the event stream grows, so this is not an ideal solution. Or
>>> >     does Flink apply some clever optimizations to avoid the potential
>>> >     performance issue?
>>> >
>>> >     Another thought was to split the event stream into multiple streams
>>> > by
>>> >     source, each of which will have a small (and limited) amount of
>>> > data.
>>> >     This will make processing each stream simpler, but since there can
>>> > be
>>> >     thousands of sources, it will result in a lot of streams to handle
>>> > and
>>> >     persist (probably in Kafka). This does not seem ideal either.
>>> >
>>> >     It seems like this should be simple, but I'm struggling with
>>> >     understanding how to solve it elegantly.
>>> >
>>> >     Regards,
>>> >     Raman
>>> >
>>> >
>>
>>
>>
>

Reply via email to