56000后不是下发新的watermark了嘛

在 2020/4/4 下午5:57,“忝忝向仧”<[email protected]> 写入:

    各位好:&nbsp; &nbsp; &nbsp; &nbsp; Flink双流Join遇到一个问题,能否解释下,谢谢.
    &nbsp; &nbsp; &nbsp; &nbsp; ds1和ds2分别读取kafka两个流数据,使用event 
time和watermark特性,3s的一个翻滚窗口,定义如下:
    &nbsp; &nbsp; &nbsp; &nbsp; 最后,join输出的时候,为什么触发窗口的数据第二条就触发了?
    &nbsp; &nbsp; &nbsp; &nbsp; 
按照水印的触发条件应该是watermark_time&gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么?
    
    
    
    定义的代码如下:
    DataStream<String&gt; stream1 = env
            .addSource(new FlinkKafkaConsumer09<String&gt;("stream1", new 
SimpleStringSchema(), properties).setStartFromLatest())
            .assignTimestampsAndWatermarks(
                    new AssignerWithPeriodicWatermarks<String&gt;() {
                        long currentTimeStamp = 0L;
                        long maxDelayAllowed = 0L;
                        long currentWaterMark;
                        @Nullable
                        @Override
                        public Watermark getCurrentWatermark() {
                            currentWaterMark = currentTimeStamp-maxDelayAllowed;
                            return new Watermark(currentWaterMark);
                        }
                        @Override
                        public long extractTimestamp(String s, long l) {
                            String[] arr= s.split(" ");
                            long timeStamp = Long.parseLong(arr[2]);
                            currentTimeStamp = Math.max(timeStamp, 
currentTimeStamp);
                            System.out.println("currentTimeStamp: " +  
currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" 
+ currentWaterMark);
                            return timeStamp;
                        }
                    }
            );
    
    DataStream<Tuple3<String,String,String&gt;&gt; ds1 = stream1.map(new 
MapFunction<String, Tuple3<String,String,String&gt;&gt;() {
        @Override
        public Tuple3<String, String,String&gt; map(String s1) throws Exception 
{
            String[] arr = s1.split(" ");
            return Tuple3.of(arr[0],arr[1],arr[2]);
        }
    });
    
    ds1.print();
    DataStream<String&gt; stream2 = env
            .addSource(new FlinkKafkaConsumer09<String&gt;("stream2", new 
SimpleStringSchema(), properties).setStartFromLatest())
            .assignTimestampsAndWatermarks(
                    new AssignerWithPeriodicWatermarks<String&gt;() {
                        long currentTimeStamp = 0L;
                        long maxDelayAllowed = 0L;
                        long currentWaterMark;
                        @Nullable
                        @Override
                        public Watermark getCurrentWatermark() {
                            currentWaterMark = currentTimeStamp-maxDelayAllowed;
                            return new Watermark(currentWaterMark);
                        }
                        @Override
                        public long extractTimestamp(String s, long l) {
                            String[] arr= s.split(" ");
                            long timeStamp = Long.parseLong(arr[2]);
                            currentTimeStamp = Math.max(timeStamp, 
currentTimeStamp);
                            System.out.println("currentTimeStamp: " +  
currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" 
+ currentWaterMark);
                            return timeStamp;
                        }
                    }
            );
    
    DataStream<Tuple3<String,String,String&gt;&gt; ds2 =stream2.map(new 
MapFunction<String, Tuple3<String,String,String&gt;&gt;() {
        @Override
        public Tuple3<String,String,String&gt; map(String s2) throws Exception {
            String [] arr = s2.split(" ");
            return Tuple3.of(arr[0],arr[1],arr[2]);
        }
    });
    ds2.print();
    ds1.join(ds2).where(new KeySelector<Tuple3<String, 
String,String&gt;,String&gt;() {
        @Override
        public String getKey(Tuple3<String, String,String&gt; value) throws 
Exception {
            return value.f0;
        }
    }).equalTo(new KeySelector<Tuple3<String, String ,String&gt;, String&gt;() {
        @Override
        public String getKey(Tuple3<String, String,String&gt; value) throws 
Exception {
            return value.f0;
        }
    })
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply(new JoinFunction<Tuple3<String, String,String&gt;, Tuple3<String, 
String,String&gt;, String&gt;() {
        @Override
        public String join(Tuple3<String, String,String&gt; value1, 
Tuple3<String, String,String&gt; value2) throws Exception {
            return value1.f1 + "=" + value2.f1;
        }
    }).print();
    结果如下:
    currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
    4&gt; (1,tom1,1000000055000)
    currentTimeStamp: 
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
    4&gt; (1,tom2,1000000056000)
    currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
    3&gt; (1,jerry1,1000000055000)
    currentTimeStamp: 
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
    3&gt; (1,jerry2,1000000056000)
    2&gt; tom1=jerry1

回复