我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。

因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。

我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间,
那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据
的时间最早的那个。

元始(Bob Hu) <[email protected]> 于2020年7月5日周日 下午8:48写道:

> 谢谢您的解答。感觉flink这个机制有点奇怪呢
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Benchao Li"<[email protected]>;
> *发送时间:* 2020年7月5日(星期天) 中午11:58
> *收件人:* "元始(Bob Hu)"<[email protected]>;
> *抄送:* "user-zh"<[email protected]>;
> *主题:* Re: flink interval join后按窗口聚组问题
>
> 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
> 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等)
> 就会有些问题,很多数据被作为late数据直接丢掉了。
>
> 元始(Bob Hu) <[email protected]> 于2020年7月3日周五 下午3:29写道:
>
>> 您好,我想请教一个问题:
>> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
>> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
>> and a.rowtime + INTERVAL '1' HOUR
>> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
>> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
>> allowedLateness +
>> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
>> rightRelativeSize) +
>> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>> by的时候这种右表数据为空的数据就丢掉了啊。
>> flink版本 1.10.0。
>>
>> 下面是我的一段测试代码:
>>
>> import org.apache.commons.net.ntp.TimeStamp;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.functions.ScalarFunction;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.util.IOUtils;
>>
>> import java.io.BufferedReader;
>> import java.io.InputStreamReader;
>> import java.io.Serializable;
>> import java.net.InetSocketAddress;
>> import java.net.Socket;
>> import java.sql.Timestamp;
>> import java.text.SimpleDateFormat;
>> import java.util.ArrayList;
>> import java.util.Date;
>> import java.util.List;
>>
>> public class TimeBoundedJoin {
>>
>>     public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer 
>> maxIdleTime, long finalMaxOutOfOrderness) {
>>         AssignerWithPeriodicWatermarks<Row> timestampExtractor = new 
>> AssignerWithPeriodicWatermarks<Row>() {
>>             private long currentMaxTimestamp = 0;
>>             private long lastMaxTimestamp = 0;
>>             private long lastUpdateTime = 0;
>>             boolean firstWatermark = true;
>> //            Integer maxIdleTime = 30;
>>
>>             @Override
>>             public Watermark getCurrentWatermark() {
>>                 if(firstWatermark) {
>>                     lastUpdateTime = System.currentTimeMillis();
>>                     firstWatermark = false;
>>                 }
>>                 if(currentMaxTimestamp != lastMaxTimestamp) {
>>                     lastMaxTimestamp = currentMaxTimestamp;
>>                     lastUpdateTime = System.currentTimeMillis();
>>                 }
>>                 if(maxIdleTime != null && System.currentTimeMillis() - 
>> lastUpdateTime > maxIdleTime * 1000) {
>>                     return new Watermark(new Date().getTime() - 
>> finalMaxOutOfOrderness * 1000);
>>                 }
>>                 return new Watermark(currentMaxTimestamp - 
>> finalMaxOutOfOrderness * 1000);
>>
>>             }
>>
>>             @Override
>>             public long extractTimestamp(Row row, long 
>> previousElementTimestamp) {
>>                 Object value = row.getField(1);
>>                 long timestamp;
>>                 try {
>>                     timestamp = (long)value;
>>                 } catch (Exception e) {
>>                     timestamp = ((Timestamp)value).getTime();
>>                 }
>>                 if(timestamp > currentMaxTimestamp) {
>>                     currentMaxTimestamp = timestamp;
>>                 }
>>                 return timestamp;
>>             }
>>         };
>>         return timestampExtractor;
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment bsEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>         StreamTableEnvironment bsTableEnv = 
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>         bsEnv.setParallelism(1);
>>         bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>> //        DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
>>         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>>         List<Row> list = new ArrayList<>();
>>         list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 
>> 00:00:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 00:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 00:40:00").getTime()), 100));
>>         list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 
>> 01:00:01").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 02:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 02:30:00").getTime()), 100));
>>         list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 
>> 02:00:02").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 02:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 02:40:00").getTime()), 100));
>>         list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 
>> 03:00:03").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 03:20:00").getTime()), 100));
>>         list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 
>> 03:40:00").getTime()), 100));
>>         list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 
>> 04:00:04").getTime()), 100));
>>         DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
>>             @Override
>>             public void run(SourceContext<Row> ctx) throws Exception {
>>                 for(Row row : list) {
>>                     ctx.collect(row);
>>                     Thread.sleep(1000);
>>                 }
>>
>>             }
>>
>>             @Override
>>             public void cancel() {
>>
>>             }
>>         });
>>         ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
>>         ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, 
>> Types.SQL_TIMESTAMP, Types.INT)));
>>         bsTableEnv.createTemporaryView("order_info", ds1, "order_id, 
>> order_time, fee, rowtime.rowtime");
>>
>>         List<Row> list2 = new ArrayList<>();
>>         list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 
>> 01:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 01:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 01:30:00").getTime())));
>>         list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 
>> 02:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 02:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 02:40:00").getTime())));
>> //        list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 
>> 03:00:03").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 03:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 03:40:00").getTime())));
>>         list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 
>> 04:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 04:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 04:40:00").getTime())));
>>         list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 
>> 05:00:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 05:20:00").getTime())));
>>         list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 
>> 05:40:00").getTime())));
>>         DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
>>             @Override
>>             public void run(SourceContext<Row> ctx) throws Exception {
>>                 for(Row row : list2) {
>>                     ctx.collect(row);
>>                     Thread.sleep(1000);
>>                 }
>>
>>             }
>>
>>             @Override
>>             public void cancel() {
>>
>>             }
>>         });
>>         ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
>>         ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, 
>> Types.SQL_TIMESTAMP)));
>>         bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, 
>> rowtime.rowtime");
>>
>>         Table joinTable =  bsTableEnv.sqlQuery("SELECT a.*,b.order_id from 
>> order_info a left join pay b on a.order_id=b.order_id and b.rowtime between 
>> a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");
>>
>>         bsTableEnv.toAppendStream(joinTable, Row.class).process(new 
>> ProcessFunction<Row, Object>() {
>>             @Override
>>             public void processElement(Row value, Context ctx, 
>> Collector<Object> out) throws Exception {
>>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
>> HH:mm:ss.SSS");
>>                 System.err.println("row:" + value + ",rowtime:" + 
>> value.getField(3) + ",watermark:" + 
>> sdf.format(ctx.timerService().currentWatermark()));
>>             }
>>         });
>>
>>         bsTableEnv.execute("job");
>>     }
>> }
>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li

回复