Hi, yeah, in that case per-key watermarks would be useful for you. I won't be possible to add such a feature, though, due to the (possibly) dynamic nature of the key space and how watermark tracking works.
You should be able to implement it with relatively low overhead using a RichFlatMapFunction and keyed state. This is the relevant section of the doc: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface . We are also in the process of improving our windowing system, especially when it comes to late data, cleanup and trigger semantics. You can have a look here if you're interested: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing . Best, Aljoscha On Tue, 31 May 2016 at 14:36 <leon_mcl...@tutanota.com> wrote: > Hi Aljoscha, > > thanks for the speedy reply. > > I am processing measurements delivered by smart meters. I use windows to > gather measurements and calculate values such as average consumption. The > key is simply the meter ID. > > The challenge is that meters may undergo network partitioning, under which > they fall back to local buffering. The data is then transmitted once > connectivity has been re-established. I am using event time to obtain > accurate calculations. > > If a specific meter goes offline, and the watermark progresses to the next > window for an operator instance, then all late data will be discarded once > that meter is online again, until it has caught up to the event time. This > is because I am using a custom EventTimeTrigger implementation that > discards late elements. The reason for that is because Flink would > otherwise immediately evaluate the window upon receiving a late element, > which is a problem since my calculations (e.g. the average consumption) > depend on multiple elements. I cannot calculate averages with that single > late element. > > Each individual meter guarantees in-order transmission of measurements. If > watermarks progressed per key, then i would never have late elements > because of that guarantee. I would be able to accurately calculate > averages, with the trade-off that my results would arrive sporadically from > the same operator instance. > > I suppose I could bypass the use of windows by implementing a stateful map > function that mimics windows to a certain degree. I implemented something > similar in Storm, but the amount of application logic required is > substantial. > > I completely understand why Flink evaluates a window on a late element, > since there is no other way to know when to evaluate the window as event > time has already progressed. > > Perhaps there is a way to gather/redirect late elements? > > Regards > Leon > > 31. May 2016 13:37 by aljos...@apache.org: > > > Hi, > I'm afraid this is impossible with the current design of Flink. Might I > ask what you want to achieve with this? Maybe we can come up with a > solution. > > -Aljoscha > > On Tue, 31 May 2016 at 13:24 <leon_mcl...@tutanota.com> wrote: > >> My use case primarily concerns applying transformations per key, with the >> keys remaining fixed throughout the topology. I am using event time for my >> windows. >> >> The problem i am currently facing is that watermarks in windows propagate >> per operator instance, meaning the operator event time increases for all >> keys that the operator is in charge of. I wish for watermarks to progress >> per key, not per operator instance. >> >> Is this easily possible? I was unable to find an appropriate solution >> based on existing code recipes. >> >> Greetings >> Leon >> >