Big thanks for replying Aljoscha, I spend quite some time on thinking how
to solve this problem and came to some conclusions. Would be cool if you
can verify if my logic is correct.

I decided that if I will partition data in kafka in the same way as I
partition my window with keyby. It's tenant, user combination (I would
still use hash out of it in kafka producer) and I will switch processing to
event time (currently it was processing time) then during replay I could be
100% sure that first element will always be first, and watermark for
triggering the window would also come at the same moment. This giving me
idempotent writes of this batched object to HBase.

And for late events (by configuring lateness on the window itself) I would
configure the trigger to fire & purge, so that it doesn't hold fired data.
This way if late event arrives I could fire this late event with a
different timestamp treating it in hbase as totally separate increment, not
overriding my previous data.
The reason I want to purge data here on firing, is cause I would need to
have allowed lateness on window of at least 2 months. So holding all data
after firing for 2 months would be too costly.
Additional question here, is there any cost to having allowed lateness very
high (like 2 months) if we configure trigger to fire & purge. Like any
additional state or metadata that flinks need to maintain that would take
much memory from the cluster? Would I have to consider rocksdb here for
state or FS state could still work?

On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> Yes, your analysis is correct: Flink will not retry for individual
> elements but will restore from the latest consistent checkpoint in case of
> failure. This also means that you can get different window results based on
> which element arrives first, i.e. you have a different timestamp on your
> output in that case.
>
> One simple mitigation for the timestamp problem is to use the largest
> timestamp of elements within a window instead of the first timestamp. This
> will be stable across restores even if the order of arrival of elements
> changes. You can still get problems when it comes to late data and window
> triggering, if you cannot guarantee that your watermark is 100 % correct,
> though. I.e. it might be that, upon restore, an element with an even larger
> timestamp arrives late that was not considered when doing the first
> processing that failed.
>
> Best,
> Aljoscha
> > On 25. Apr 2017, at 19:54, Kamil Dziublinski <
> kamil.dziublin...@gmail.com> wrote:
> >
> > Hi guys,
> >
> > I have a flink streaming job that reads from kafka, creates some
> statistics increments and stores this in hbase (using normal puts).
> > I'm using fold function here of with window of few seconds.
> >
> > My tests showed me that restoring state with window functions is not
> exactly working how I expected.
> > I thought that if my window functions emits an aggregated object to a
> sink, and that object fails in a sink, this write to hbase will be
> replayed. So even if it actually got written to HBase, but flink thought it
> didnt (for instance during network problem) I could be sure of idempotent
> writes. I wanted to enforce that by using the timestamp of the first event
> used in that window for aggregation.
> >
> > Now correct me if I'm wrong but it seems that in the case of failure
> (even if its in sink) whole flow is getting replayed from last checkpoint
> which means that my window function might evict aggregated object in a
> different form. For instance not only having tuples that failed but also
> other ones, which would break my idempotency her and I might end up with
> having higher counters than I should have.
> >
> > Do you have any suggestion on how to solve/workaround such problem in
> flink?
> >
> > Thanks,
> > Kamil.
> >
> >
>
>

Reply via email to