I have to extend my answer:

The behavior allowedLateness that I described applies only if the window
trigger calls FIRE when the window is evaluated (this is the default
behavior of most triggers).

In case the trigger calls FIRE_AND_PURGE, the state of the window is purged
when the function is evaluated and late events are processed alone, i.e.,
in my example <12:09, G> would be processed without [A, B, C, D].
When the allowed lateness is passed, all window state is purged regardless
of the trigger.

Best, Fabian

2016-10-17 16:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Yassine,
> the difference is the following:
> 1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp
> extractor and watermark assigner.
> A timestamp extractor tells Flink when an event happened, i.e., it
> extracts a timestamp from the event. A watermark assigner tells Flink what
> the current logical time is.
> The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink
> asks what the current time is, it returns the latest observed timestamp
> minus the a configurable bound. This is the safety margin for late data.
>  A record whose timestamp is lower than the last watermark is considered
> to be late.
> 2) The allowedLateness parameter of time windows tells Flink how long to
> keep state around after the window was evaluated.
> If data arrives after the evaluation and before the allowedLateness has
> passed, the window function is applied again and an update is sent out.
> Let's look at an example.
> Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling
> window that starts at 12:00 and ends at 12:10:
> If you have the following data:
> 12:01, A
> 12:04, B
> WM, 12:02 // 12:04 - 2 minutes
> 12:02, C
> 12:08, D
> 12:14, E
> WM, 12:12
> 12:16, F
> WM, 12:14 // 12:16 - 2 minutes
> 12:09, G
> == no allowed lateness
> The window operator forwards the logical time to 12:12 when it receives
> <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this
> time and finally purges its state. <12:09, G> is later ignored.
> == allowed lateness of 3 minutes
> The window operator evaluates the window when <WM, 12:12> is received, but
> its state is not purged yet. The state is purged when <WM, 12:14> is
> received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is
> again ignored.
> == allowed lateness of 5 minutes
> The window operator evaluates the window when <WM, 12:12> is received, but
> its state is not purged yet. When <12:09, G> is received, the window is
> again evaluated but this time with [A, B, C, D, G] and an update is sent
> out. The state is purged when a watermark of >=12:15 is received.
> So, watermarks tell the Flink what time it is and allowed lateness tells
> the system when state should be discarded and all later arriving data be
> ignored.
> These issue are related but not exactly the same thing. For instance you
> can counter late data by increasing the bound or the lateness parameter.
> Increasing the watermark bound will yield higher latencies as windows are
> evaluated later.
> Configuring allowedLateness will allow for earlier results, but you have
> to cope with the updates downstream.
> Please let me know, if you have questions.
> Best, Fabian
> 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>> Hi,
>> I'm a bit confused about how Flink deals with late elements after the
>> introduction of allowedlateness to windows. What is the difference between
>> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
>> allowedlateness(Time.seconds(X))? What if one is used and the other is
>> not? and what if a different lateness is used in each one? Could you please
>> clarify it on basis of a simple example? Thank you.
>> Best,
>> Yassine

Reply via email to