Hi Aljoscha,

Just thinking on the EventTimeTrigger example you provided, and I'm going
to apologise in advance for taking more of your time!,  but I'm thinking
that should I go down that route any long allowedLateness, we'll run into
issues with memory use, unless Flink is smart enough, configurable enough,
or customisable enough to allow *where *the ageing state is kept.

Thoughts?

Thanks!

Andy

On Fri, 15 Jan 2016 at 15:51 Andrew Coates <big.andy.coa...@gmail.com>
wrote:

> Hi Aljoscha,
>
> Thanks for the info!
>
> Andy
>
> On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Hi,
>> I imagine you are taking about CountTrigger, DeltaTrigger, and
>> Continuous*Trigger. For these we never purge. They are a leftover artifact
>> from an earlier approach to implementing windowing strategies that was
>> inspired by IBM InfoSphere streams. Here, all triggers are essentially
>> accumulating and elements are evicted by an evictor. This is very flexible
>> but makes it hard to implement windowing code efficiently. If you are
>> interested here is a Master Thesis that describes that earlier
>> implementation:
>> http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf
>>
>> These triggers are problematic because they never purge window contents
>> if you don’t have an evictor that does correct eviction. Also, they don’t
>> allow incremental aggregation over elements as they arrive since you don’t
>> know what will be the contents of the window until the trigger fires and
>> the evictor evicts.
>>
>> So, as a short answer: the accumulating triggers never purge window state
>> on their own. I hope this helps somehow.
>>
>> Cheers,
>> Aljoscha
>> > On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coa...@gmail.com>
>> wrote:
>> >
>> > Thanks Aljoscha, that's very enlightening.
>> >
>> > Can you please also explain what the default behaviour is? I.e. if I
>> use one if the accumulating inbuilt triggers, when does the state get
>> purged? (With your info I can now probably work things out, but you may
>> give more insight :)
>> >
>> > Also, are there plans to add explicit lateness control to flink core?
>> (I'm aware off the dataflow integration work )
>> >
>> > Thanks again,
>> >
>> > Andy
>> >
>> >
>> > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> > Hi,
>> > the window contents are stored in state managed by the window operator
>> at all times until they are purged by a Trigger returning PURGE from one of
>> its on*() methods.
>> >
>> > Out of the box, Flink does not have something akin to the lateness and
>> cleanup of Google Dataflow. You can, however implement it yourself using a
>> custom Trigger. This is an example that mimics Google Dataflow:
>> >
>> > public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
>> >    private static final long serialVersionUID = 1L;
>> >
>> >    private final boolean accumulating;
>> >    private final long allowedLateness;
>> >
>> >    private EventTimeTrigger(boolean accumulating, long allowedLateness)
>> {
>> >       this.accumulating = accumulating;
>> >       this.allowedLateness = allowedLateness;
>> >    }
>> >
>> >    @Override
>> >    public TriggerResult onElement(Object element, long timestamp,
>> TimeWindow window, TriggerContext ctx) throws Exception {
>> >       ctx.registerEventTimeTimer(window.maxTimestamp());
>> >       return TriggerResult.CONTINUE;
>> >    }
>> >
>> >    @Override
>> >    public TriggerResult onEventTime(long time, TimeWindow window,
>> TriggerContext ctx) {
>> >       if (time == window.maxTimestamp()) {
>> >          if (accumulating) {
>> >             // register the cleanup timer if we are accumulating (and
>> allow lateness)
>> >             if (allowedLateness > 0) {
>> >                ctx.registerEventTimeTimer(window.maxTimestamp() +
>> allowedLateness);
>> >             }
>> >             return TriggerResult.FIRE;
>> >          } else {
>> >             return TriggerResult.FIRE_AND_PURGE;
>> >          }
>> >       } else if (time == window.maxTimestamp() + allowedLateness) {
>> >          return TriggerResult.PURGE;
>> >       }
>> >
>> >       return TriggerResult.CONTINUE;
>> >    }
>> >
>> >    @Override
>> >    public TriggerResult onProcessingTime(long time, TimeWindow window,
>> TriggerContext ctx) throws Exception {
>> >       return TriggerResult.CONTINUE;
>> >    }
>> >
>> >    @Override
>> >    public String toString() {
>> >       return "EventTimeTrigger()";
>> >    }
>> >
>> >    /**
>> >     * Creates an event-time trigger that fires once the watermark
>> passes the end of the window.
>> >     *
>> >     * <p>
>> >     * Once the trigger fires all elements are discarded. Elements that
>> arrive late immediately
>> >     * trigger window evaluation with just this one element.
>> >     */
>> >    public static EventTimeTrigger discarding() {
>> >       return new EventTimeTrigger(false, 0L);
>> >    }
>> >
>> >    /**
>> >     * Creates an event-time trigger that fires once the watermark
>> passes the end of the window.
>> >     *
>> >     * <p>
>> >     * This trigger will not immediately discard all elements once it
>> fires. Only after the
>> >     * watermark passes the specified lateness are the window elements
>> discarded, without
>> >     * emitting a new result. If a late element arrives within the
>> specified lateness
>> >     * the window is computed again and a new result is emitted.
>> >     */
>> >    public static EventTimeTrigger accumulating(AbstractTime
>> allowedLateness) {
>> >       return new EventTimeTrigger(true,
>> allowedLateness.toMilliseconds());
>> >    }
>> > }
>> >
>> > You can specify a lateness and while that time is not yet reached the
>> windows will remain and late arriving elements will trigger window emission
>> with the complete window contents.
>> >
>> > Cheers,
>> > Aljoscha
>> > > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com>
>> wrote:
>> > >
>> > > Hi,
>> > >
>> > > I'm trying to understand how the lifecycle of messages / state is
>> managed by Flink, but I'm failing to find any documentation.
>> > >
>> > > Specially, if I'm using a windowed stream and a type of trigger that
>> retain the elements of the window to allow for processing of late data e.g.
>> ContinousEventTimeTrigger, then where are the contents of the windows, or
>> their intermediate computation results, stored, and when is the data
>> removed?
>> > >
>> > > I'm thinking in terms of Google's Dataflow API, setting a windows the
>> withAllowedLateness option allows the caller to control how long past the
>> end of a window the data should be maintained.  Does Flink have anything
>> similar?
>> > >
>> > > Thanks,
>> > >
>> > > Andy
>> >
>>
>>

Reply via email to