Hi Aljoscha, Just thinking on the EventTimeTrigger example you provided, and I'm going to apologise in advance for taking more of your time!, but I'm thinking that should I go down that route any long allowedLateness, we'll run into issues with memory use, unless Flink is smart enough, configurable enough, or customisable enough to allow *where *the ageing state is kept.
Thoughts? Thanks! Andy On Fri, 15 Jan 2016 at 15:51 Andrew Coates <big.andy.coa...@gmail.com> wrote: > Hi Aljoscha, > > Thanks for the info! > > Andy > > On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi, >> I imagine you are taking about CountTrigger, DeltaTrigger, and >> Continuous*Trigger. For these we never purge. They are a leftover artifact >> from an earlier approach to implementing windowing strategies that was >> inspired by IBM InfoSphere streams. Here, all triggers are essentially >> accumulating and elements are evicted by an evictor. This is very flexible >> but makes it hard to implement windowing code efficiently. If you are >> interested here is a Master Thesis that describes that earlier >> implementation: >> http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf >> >> These triggers are problematic because they never purge window contents >> if you don’t have an evictor that does correct eviction. Also, they don’t >> allow incremental aggregation over elements as they arrive since you don’t >> know what will be the contents of the window until the trigger fires and >> the evictor evicts. >> >> So, as a short answer: the accumulating triggers never purge window state >> on their own. I hope this helps somehow. >> >> Cheers, >> Aljoscha >> > On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coa...@gmail.com> >> wrote: >> > >> > Thanks Aljoscha, that's very enlightening. >> > >> > Can you please also explain what the default behaviour is? I.e. if I >> use one if the accumulating inbuilt triggers, when does the state get >> purged? (With your info I can now probably work things out, but you may >> give more insight :) >> > >> > Also, are there plans to add explicit lateness control to flink core? >> (I'm aware off the dataflow integration work ) >> > >> > Thanks again, >> > >> > Andy >> > >> > >> > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org> >> wrote: >> > Hi, >> > the window contents are stored in state managed by the window operator >> at all times until they are purged by a Trigger returning PURGE from one of >> its on*() methods. >> > >> > Out of the box, Flink does not have something akin to the lateness and >> cleanup of Google Dataflow. You can, however implement it yourself using a >> custom Trigger. This is an example that mimics Google Dataflow: >> > >> > public class EventTimeTrigger implements Trigger<Object, TimeWindow> { >> > private static final long serialVersionUID = 1L; >> > >> > private final boolean accumulating; >> > private final long allowedLateness; >> > >> > private EventTimeTrigger(boolean accumulating, long allowedLateness) >> { >> > this.accumulating = accumulating; >> > this.allowedLateness = allowedLateness; >> > } >> > >> > @Override >> > public TriggerResult onElement(Object element, long timestamp, >> TimeWindow window, TriggerContext ctx) throws Exception { >> > ctx.registerEventTimeTimer(window.maxTimestamp()); >> > return TriggerResult.CONTINUE; >> > } >> > >> > @Override >> > public TriggerResult onEventTime(long time, TimeWindow window, >> TriggerContext ctx) { >> > if (time == window.maxTimestamp()) { >> > if (accumulating) { >> > // register the cleanup timer if we are accumulating (and >> allow lateness) >> > if (allowedLateness > 0) { >> > ctx.registerEventTimeTimer(window.maxTimestamp() + >> allowedLateness); >> > } >> > return TriggerResult.FIRE; >> > } else { >> > return TriggerResult.FIRE_AND_PURGE; >> > } >> > } else if (time == window.maxTimestamp() + allowedLateness) { >> > return TriggerResult.PURGE; >> > } >> > >> > return TriggerResult.CONTINUE; >> > } >> > >> > @Override >> > public TriggerResult onProcessingTime(long time, TimeWindow window, >> TriggerContext ctx) throws Exception { >> > return TriggerResult.CONTINUE; >> > } >> > >> > @Override >> > public String toString() { >> > return "EventTimeTrigger()"; >> > } >> > >> > /** >> > * Creates an event-time trigger that fires once the watermark >> passes the end of the window. >> > * >> > * <p> >> > * Once the trigger fires all elements are discarded. Elements that >> arrive late immediately >> > * trigger window evaluation with just this one element. >> > */ >> > public static EventTimeTrigger discarding() { >> > return new EventTimeTrigger(false, 0L); >> > } >> > >> > /** >> > * Creates an event-time trigger that fires once the watermark >> passes the end of the window. >> > * >> > * <p> >> > * This trigger will not immediately discard all elements once it >> fires. Only after the >> > * watermark passes the specified lateness are the window elements >> discarded, without >> > * emitting a new result. If a late element arrives within the >> specified lateness >> > * the window is computed again and a new result is emitted. >> > */ >> > public static EventTimeTrigger accumulating(AbstractTime >> allowedLateness) { >> > return new EventTimeTrigger(true, >> allowedLateness.toMilliseconds()); >> > } >> > } >> > >> > You can specify a lateness and while that time is not yet reached the >> windows will remain and late arriving elements will trigger window emission >> with the complete window contents. >> > >> > Cheers, >> > Aljoscha >> > > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com> >> wrote: >> > > >> > > Hi, >> > > >> > > I'm trying to understand how the lifecycle of messages / state is >> managed by Flink, but I'm failing to find any documentation. >> > > >> > > Specially, if I'm using a windowed stream and a type of trigger that >> retain the elements of the window to allow for processing of late data e.g. >> ContinousEventTimeTrigger, then where are the contents of the windows, or >> their intermediate computation results, stored, and when is the data >> removed? >> > > >> > > I'm thinking in terms of Google's Dataflow API, setting a windows the >> withAllowedLateness option allows the caller to control how long past the >> end of a window the data should be maintained. Does Flink have anything >> similar? >> > > >> > > Thanks, >> > > >> > > Andy >> > >> >>