HI Soheil,

That may relate to your parallelism since each extractor instance compute its 
own watermarks. Try to print the max timestamps with the current thread’s name 
and you will notice this.

Best,
Xingcan

> On Jul 30, 2018, at 3:05 PM, Soheil Pourbafrani <soheil.i...@gmail.com> wrote:
> 
> Using Flink EventTime feature, I implement the class 
> AssignerWithPeriodicWatermark such that:
> 
> public static class SampleTimestampExtractor implements 
> AssignerWithPeriodicWatermarks<Tuple3<String, Long, JSONObject>> {
>     private static final long serialVersionUID = 1L;
>     private long MAX_TIMESTAMP;
>     private final long DELEY = 3000;
> 
> 
>     @Override
>     public long extractTimestamp(Tuple3<String, Long, JSONObject> t, long l) {
>         long timestamp = t.f1 ;
>         MAX_TIMESTAMP =  Math.max(timestamp , MAX_TIMESTAMP);
>         System.out.println("Max TimeStamp : " + MAX_TIMESTAMP);
>         return timestamp ;
>     }
> 
>     @Nullable
>     @Override
>     public Watermark getCurrentWatermark() {
>         System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY));
>         return new Watermark(MAX_TIMESTAMP - DELEY);
>     }
> }
> In addition, I set the watermark interval to 100 milliseconds:
> env.getConfig().setAutoWatermarkInterval(100);
> But when I check the logs, some watermarks are -3000, so in 
> getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 = 
> -3000), while I can see in the logs that the MAX_TIMESTAMP has a value 
> greater than zero!
> Here is a part of the output:
> Max TimeStamp : 1532934243136
> Max TimeStamp : 1532934243136
> Max TimeStamp : 1532934243144
> Max TimeStamp : 1532934243144
> Max TimeStamp : 1532934243152
> Max TimeStamp : 1532934243152
> Max TimeStamp : 1532934243160
> Max TimeStamp : 1532934243160
> Max TimeStamp : 1532934243168
> Max TimeStamp : 1532934243168
> Current WatreMark : 1532934240168
> Current WatreMark : -3000
> Current WatreMark : -3000
> Current WatreMark : 1532934240168
> Max TimeStamp : 1532934243176
> Max TimeStamp : 1532934243176
> Max TimeStamp : 1532934243184
> Max TimeStamp : 1532934243200
> Max TimeStamp : 1532934243208
> Max TimeStamp : 1532934243184
> 
> 

Reply via email to