56000后不是下发新的watermark了嘛
在 2020/4/4 下午5:57,“忝忝向仧”<[email protected]> 写入:
各位好: Flink双流Join遇到一个问题,能否解释下,谢谢.
ds1和ds2分别读取kafka两个流数据,使用event
time和watermark特性,3s的一个翻滚窗口,定义如下:
最后,join输出的时候,为什么触发窗口的数据第二条就触发了?
按照水印的触发条件应该是watermark_time>=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么?
定义的代码如下:
DataStream<String> stream1 = env
.addSource(new FlinkKafkaConsumer09<String>("stream1", new
SimpleStringSchema(), properties).setStartFromLatest())
.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<String>() {
long currentTimeStamp = 0L;
long maxDelayAllowed = 0L;
long currentWaterMark;
@Nullable
@Override
public Watermark getCurrentWatermark() {
currentWaterMark = currentTimeStamp-maxDelayAllowed;
return new Watermark(currentWaterMark);
}
@Override
public long extractTimestamp(String s, long l) {
String[] arr= s.split(" ");
long timeStamp = Long.parseLong(arr[2]);
currentTimeStamp = Math.max(timeStamp,
currentTimeStamp);
System.out.println("currentTimeStamp: " +
currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:"
+ currentWaterMark);
return timeStamp;
}
}
);
DataStream<Tuple3<String,String,String>> ds1 = stream1.map(new
MapFunction<String, Tuple3<String,String,String>>() {
@Override
public Tuple3<String, String,String> map(String s1) throws Exception
{
String[] arr = s1.split(" ");
return Tuple3.of(arr[0],arr[1],arr[2]);
}
});
ds1.print();
DataStream<String> stream2 = env
.addSource(new FlinkKafkaConsumer09<String>("stream2", new
SimpleStringSchema(), properties).setStartFromLatest())
.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<String>() {
long currentTimeStamp = 0L;
long maxDelayAllowed = 0L;
long currentWaterMark;
@Nullable
@Override
public Watermark getCurrentWatermark() {
currentWaterMark = currentTimeStamp-maxDelayAllowed;
return new Watermark(currentWaterMark);
}
@Override
public long extractTimestamp(String s, long l) {
String[] arr= s.split(" ");
long timeStamp = Long.parseLong(arr[2]);
currentTimeStamp = Math.max(timeStamp,
currentTimeStamp);
System.out.println("currentTimeStamp: " +
currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:"
+ currentWaterMark);
return timeStamp;
}
}
);
DataStream<Tuple3<String,String,String>> ds2 =stream2.map(new
MapFunction<String, Tuple3<String,String,String>>() {
@Override
public Tuple3<String,String,String> map(String s2) throws Exception {
String [] arr = s2.split(" ");
return Tuple3.of(arr[0],arr[1],arr[2]);
}
});
ds2.print();
ds1.join(ds2).where(new KeySelector<Tuple3<String,
String,String>,String>() {
@Override
public String getKey(Tuple3<String, String,String> value) throws
Exception {
return value.f0;
}
}).equalTo(new KeySelector<Tuple3<String, String ,String>, String>() {
@Override
public String getKey(Tuple3<String, String,String> value) throws
Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new JoinFunction<Tuple3<String, String,String>, Tuple3<String,
String,String>, String>() {
@Override
public String join(Tuple3<String, String,String> value1,
Tuple3<String, String,String> value2) throws Exception {
return value1.f1 + "=" + value2.f1;
}
}).print();
结果如下:
currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
4> (1,tom1,1000000055000)
currentTimeStamp:
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
4> (1,tom2,1000000056000)
currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
3> (1,jerry1,1000000055000)
currentTimeStamp:
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
3> (1,jerry2,1000000056000)
2> tom1=jerry1