A couple of other details worth mentioning: 1. Your choice of connector matters a lot. Some connectors provide limited ordering guarantees, especially across keys, which leads to a highly disordered stream of events (with respect to event time) from the perspective of the watermark generator. Some connectors provide mitigations; for example, the Kafka connector supports a per-partition watermark generator.
2. When using `assignTimestampsAndWatermarks`, Flink creates a separate instance of your watermark generator for each instance of the operator to which it is assigned. 3. Watermark generators may contain private fields but their state isn't checkpointed (to my knowledge). Hope this helps, Eron On Tue, Dec 12, 2017 at 7:53 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > To add to Fabian's comment, > > What can be done ( and that may not be the norm ) is keep > a 95-99% quantile ( using an Approximate Histogram such that the execution > is not heavy ) of the diff between server ( or ingestion time ) and event > time and use it as a max out of order ness. We keep the last n of these > quantile values each representing x elements and chose the least. What we > figure is that giving a max upper bound on real distribution is more > palatable than some arbitrary value. The late data becomes inconsequential > as the lateness has been incorporated in the delayed watermark generation. > > Vishal. > > > > > > > > On Tue, Dec 12, 2017 at 7:46 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> As I said before, you can solve that with a custom WatermarkAssigner. >> Collect a histogram, take the median out of X samples, ignore outliers, >> etc. >> >> 2017-12-12 13:37 GMT+01:00 Jinhua Luo <luajit...@gmail.com>: >> >>> Think about we have a normal ordered stream, if an abnormal event A >>> appears and thus advances the watermark, making all subsequent normal >>> events (earlier than A) late, I think it's a mistake. >>> The ways you listed cannot help this mistake. The normal events cannot >>> be dropped, and the lateness may be hard to determine (it depends on >>> the timestamp of the abnormal event) and re-triggered the window to >>> downstream brings in side-effect. >>> If the abnormal event appears in the middle of stream, then maybe we >>> could filter out this event checking the delta with the last element, >>> but what if the abnormal event is the first event emitted by the >>> source? >>> >>> >>> 2017-12-12 19:25 GMT+08:00 Fabian Hueske <fhue...@gmail.com>: >>> > Early events are usually not an issue because the can be kept in state >>> until >>> > they are ready to be processed. >>> > Also, depending on the watermark assigner often push the watermark >>> ahead >>> > such that they are not early but all other events are late. >>> > >>> > Handling of late events depends on your use case and there are the >>> three >>> > options that I already listed: >>> > >>> > 1) dropping >>> > 2) keeping state of "completed" computations for some time (allowed >>> > lateness). If a late event arrives, you can update the result and emit >>> an >>> > update. In this case your downstream operators systems have to be able >>> to >>> > deal with updates. >>> > 3) send the late events to a different channel via side outputs and >>> handle >>> > them later. >>> > >>> > >>> > >>> > 2017-12-12 12:14 GMT+01:00 Jinhua Luo <luajit...@gmail.com>: >>> >> >>> >> Yes, I know flink is flexible. >>> >> >>> >> But I am thinking when the event sequence is mess (e,g, branches of >>> >> time-series events interleaved, but each branch has completely >>> >> different time periods), then it's hard to apply them into streaming >>> >> api, because no matter which way you generate watermark, the watermark >>> >> cannot be backward or branching. >>> >> >>> >> Is there any best practice to handle late event and/or early event? >>> >> >>> >> >>> >> 2017-12-12 18:24 GMT+08:00 Fabian Hueske <fhue...@gmail.com>: >>> >> > Hi, >>> >> > >>> >> > this depends on how you generate watermarks [1]. >>> >> > You could generate watermarks with a four hour delay and be fine >>> (at the >>> >> > cost of a four hour latency) or have some checks that you don't >>> >> > increment a >>> >> > watermark by more than x minutes at a time. >>> >> > These considerations are quite use case specific, so it's hard to >>> give >>> >> > an >>> >> > advice that applies to all cases. >>> >> > >>> >> > There are also different strategies for how to handle late data in >>> >> > windows. >>> >> > You can drop it (default behavior), you can update previously >>> emitted >>> >> > results (allowed lateness) [2], or emit them to a side output [3]. >>> >> > >>> >> > Flink is quite flexible when dealing with watermarks and late data. >>> >> > >>> >> > Best, Fabian >>> >> > >>> >> > [1] >>> >> > >>> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/event_timestamps_watermarks.html >>> >> > [2] >>> >> > >>> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/windows.html#allowed-lateness >>> >> > [3] >>> >> > >>> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/windows.html#getting-late-data-as-a-side-output >>> >> > >>> >> > 2017-12-12 10:16 GMT+01:00 Jinhua Luo <luajit...@gmail.com>: >>> >> >> >>> >> >> Hi All, >>> >> >> >>> >> >> The watermark is monotonous incremental in a stream, correct? >>> >> >> >>> >> >> Given a stream out-of-order extremely, e.g. >>> >> >> e4(12:04:33) --> e3 (15:00:22) --> e2(12:04:21) --> e1 (12:03:01) >>> >> >> >>> >> >> Here e1 appears first, so watermark start from 12:03:01, so e3 is >>> an >>> >> >> early event, it would be placed in another window, and fired >>> >> >> individually, correct? If so, the result is not bad. >>> >> >> >>> >> >> The worse case is: >>> >> >> >>> >> >> e4(12:04:33) --> e3 (12:03:01) --> e2(12:04:21) --> e1 (15:00:22) >>> >> >> >>> >> >> >>> >> >> Then e2,e3,e4 would be considered late events and get discarded? >>> And >>> >> >> the watermark are set to a wrong value permanently? >>> >> >> >>> >> >> So the stream must not be that out-of-order, otherwise flink could >>> not >>> >> >> handle them well? >>> >> > >>> >> > >>> > >>> > >>> >> >> >