Hi,
yeah, in that case per-key watermarks would be useful for you. I won't be
possible to add such a feature, though, due to the (possibly) dynamic
nature of the key space and how watermark tracking works.

You should be able to implement it with relatively low overhead using a
RichFlatMapFunction and keyed state. This is the relevant section of the
doc:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface
.

We are also in the process of improving our windowing system, especially
when it comes to late data, cleanup and trigger semantics. You can have a
look here if you're interested:
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
.

Best,
Aljoscha

On Tue, 31 May 2016 at 14:36 <leon_mcl...@tutanota.com> wrote:

> Hi Aljoscha,
>
> thanks for the speedy reply.
>
> I am processing measurements delivered by smart meters. I use windows to
> gather measurements and calculate values such as average consumption. The
> key is simply the meter ID.
>
> The challenge is that meters may undergo network partitioning, under which
> they fall back to local buffering. The data is then transmitted once
> connectivity has been re-established. I am using event time to obtain
> accurate calculations.
>
> If a specific meter goes offline, and the watermark progresses to the next
> window for an operator instance, then all late data will be discarded once
> that meter is online again, until it has caught up to the event time. This
> is because I am using a custom EventTimeTrigger implementation that
> discards late elements. The reason for that is because Flink would
> otherwise immediately evaluate the window upon receiving a late element,
> which is a problem since my calculations (e.g. the average consumption)
> depend on multiple elements. I cannot calculate averages with that single
> late element.
>
> Each individual meter guarantees in-order transmission of measurements. If
> watermarks progressed per key, then i would never have late elements
> because of that guarantee. I would be able to accurately calculate
> averages, with the trade-off that my results would arrive sporadically from
> the same operator instance.
>
> I suppose I could bypass the use of windows by implementing a stateful map
> function that mimics windows to a certain degree. I implemented something
> similar in Storm, but the amount of application logic required is
> substantial.
>
> I completely understand why Flink evaluates a window on a late element,
> since there is no other way to know when to evaluate the window as event
> time has already progressed.
>
> Perhaps there is a way to gather/redirect late elements?
>
> Regards
> Leon
>
> 31. May 2016 13:37 by aljos...@apache.org:
>
>
> Hi,
> I'm afraid this is impossible with the current design of Flink. Might I
> ask what you want to achieve with this? Maybe we can come up with a
> solution.
>
> -Aljoscha
>
> On Tue, 31 May 2016 at 13:24 <leon_mcl...@tutanota.com> wrote:
>
>> My use case primarily concerns applying transformations per key, with the
>> keys remaining fixed throughout the topology. I am using event time for my
>> windows.
>>
>> The problem i am currently facing is that watermarks in windows propagate
>> per operator instance, meaning the operator event time increases for all
>> keys that the operator is in charge of. I wish for watermarks to progress
>> per key, not per operator instance.
>>
>> Is this easily possible? I was unable to find an appropriate solution
>> based on existing code recipes.
>>
>> Greetings
>> Leon
>>
>

Reply via email to