A while back I wrote this slightly more elaborate extractor that will advance 
the watermark independently after the stream is idle for a while: 
https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
 
<https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java>

Best,
Aljoscha

> On 16. Jan 2018, at 10:29, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> This depends on the requirements of your application.
> Using the usual watermark generation strategies which are purely data driven, 
> a stream that does not produce data would not advance its watermarks.
> Not advancing the watermarks means that the program cannot make progress. 
> 
> This might also be fine if your program consumes a single stream because if 
> this stream does not produce data, your program also doesn't have anything to 
> compute (there might be still data left. such as a window, that is not 
> computed). 
> The situation becomes more tricky, if your program has multiple sources that 
> become inactive at some point or a source where a partition can become 
> inactive.
> 
> AFAIK, there is a mechanism to mark partitions (and maybe complete sources) 
> as inactive.
> @Gordon (in CC) knows more about this feature.
> 
> Best, Fabian
> 
> 2018-01-15 14:51 GMT+01:00 Jayant Ameta <wittyam...@gmail.com 
> <mailto:wittyam...@gmail.com>>:
> Hi Fabian,
> I want to extract timestamps from my event. However, the events stream can be 
> sparse at times (e.g. 2 days without any events).
> What's the best strategy to create watermarks if I want real-time processing 
> of the events which enter the stream?
> 
> Jayant Ameta
> 
> On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> Another thing to point out is that watermarks are usually data-driven, i.e., 
> they depend on the timestamps of the events and not on the clock of the 
> machine.
> Otherwise, you might observe a lot of late data, i.e., events with timestamps 
> smaller than the last watermark.
> 
> If you assign timestamps and watermarks based on the clock of the machine, 
> you might also use ingestion time instead of event time.
> 
> 2018-01-11 11:49 GMT+01:00 Jayant Ameta <wittyam...@gmail.com 
> <mailto:wittyam...@gmail.com>>:
> Thanks Gary,
> I was only trying with a fixed set of events, so the Watermark was not 
> advancing, like you said.
> 
> 
> Jayant Ameta
> 
> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <g...@data-artisans.com 
> <mailto:g...@data-artisans.com>> wrote:
> Hi Jayant,
> 
> The difference is that the Watermarks from
> BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
> all previous events. That is, if you do not receive new events, the Watermark
> will not advance. In contrast, your custom implementation of
> AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
> clock.
> 
> Maybe this will already help you to debug your application. If not, it would 
> be
> great to see a minimal working example.
> 
> Best,
> Gary
> 
> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <wittyam...@gmail.com 
> <mailto:wittyam...@gmail.com>> wrote:
> Hi,
> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not 
> firing. However, the trigger fires when using custom timestamp extractor with 
> similar watermark.
> 
> Sample code below:
> 1.Assigner as anonymous class which works fine
> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new 
> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
>     @Override
>     public long extractTimestamp(Tuple2<Rule, T> element, long 
> previousElementTimestamp) {
>         return System.currentTimeMillis();
>     }
> 
>     @Override
>     public final Watermark getCurrentWatermark() {
>         // this guarantees that the watermark never goes backwards.
>         return new Watermark(System.currentTimeMillis()-100);
>     }
> };
> 
> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new 
> BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, 
> T>>(Time.milliseconds(100)) {
>     @Override
>     public long extractTimestamp(Tuple2<Rule, T> element) {
>         return System.currentTimeMillis();
>     }
> };
> 
> Do you see any difference in the approaches?
> 
> - Jayant
> 
> 
> 
> 
> 

Reply via email to