展开讨论一些特点从场景。 1、inner join场景。有什么办法取两条流的的rowtime 的max吗? 使用SQL语句的场合,怎么实现? 例如: SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as rowtime, ...
如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。 Tianwang Li <[email protected]> 于2020年8月16日周日 上午10:40写道: > 展开讨论一些特点场景。 > > Benchao Li <[email protected]> 于2020年7月6日周一 下午11:08写道: > >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。 >> >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话, >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。 >> >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。 >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间, >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据 >> 的时间最早的那个。 >> >> 元始(Bob Hu) <[email protected]> 于2020年7月5日周日 下午8:48写道: >> >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢 >> > >> > >> > ------------------ 原始邮件 ------------------ >> > *发件人:* "Benchao Li"<[email protected]>; >> > *发送时间:* 2020年7月5日(星期天) 中午11:58 >> > *收件人:* "元始(Bob Hu)"<[email protected]>; >> > *抄送:* "user-zh"<[email protected]>; >> > *主题:* Re: flink interval join后按窗口聚组问题 >> > >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。 >> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) >> > 就会有些问题,很多数据被作为late数据直接丢掉了。 >> > >> > 元始(Bob Hu) <[email protected]> 于2020年7月3日周五 下午3:29写道: >> > >> >> 您好,我想请教一个问题: >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between >> a.rowtime >> >> and a.rowtime + INTERVAL '1' HOUR >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + >> >> allowedLateness + >> >> >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, >> >> rightRelativeSize) + >> >> >> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group >> >> by的时候这种右表数据为空的数据就丢掉了啊。 >> >> flink版本 1.10.0。 >> >> >> >> 下面是我的一段测试代码: >> >> >> >> import org.apache.commons.net.ntp.TimeStamp; >> >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> >> import org.apache.flink.api.common.typeinfo.Types; >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> >> import org.apache.flink.streaming.api.TimeCharacteristic; >> >> import org.apache.flink.streaming.api.datastream.DataStream; >> >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> >> import >> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >> >> import org.apache.flink.streaming.api.functions.ProcessFunction; >> >> import org.apache.flink.streaming.api.functions.source.SourceFunction; >> >> import org.apache.flink.streaming.api.watermark.Watermark; >> >> import org.apache.flink.table.api.EnvironmentSettings; >> >> import org.apache.flink.table.api.Table; >> >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> >> import org.apache.flink.table.functions.ScalarFunction; >> >> import org.apache.flink.types.Row; >> >> import org.apache.flink.util.Collector; >> >> import org.apache.flink.util.IOUtils; >> >> >> >> import java.io.BufferedReader; >> >> import java.io.InputStreamReader; >> >> import java.io.Serializable; >> >> import java.net.InetSocketAddress; >> >> import java.net.Socket; >> >> import java.sql.Timestamp; >> >> import java.text.SimpleDateFormat; >> >> import java.util.ArrayList; >> >> import java.util.Date; >> >> import java.util.List; >> >> >> >> public class TimeBoundedJoin { >> >> >> >> public static AssignerWithPeriodicWatermarks<Row> >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { >> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new >> AssignerWithPeriodicWatermarks<Row>() { >> >> private long currentMaxTimestamp = 0; >> >> private long lastMaxTimestamp = 0; >> >> private long lastUpdateTime = 0; >> >> boolean firstWatermark = true; >> >> // Integer maxIdleTime = 30; >> >> >> >> @Override >> >> public Watermark getCurrentWatermark() { >> >> if(firstWatermark) { >> >> lastUpdateTime = System.currentTimeMillis(); >> >> firstWatermark = false; >> >> } >> >> if(currentMaxTimestamp != lastMaxTimestamp) { >> >> lastMaxTimestamp = currentMaxTimestamp; >> >> lastUpdateTime = System.currentTimeMillis(); >> >> } >> >> if(maxIdleTime != null && System.currentTimeMillis() - >> lastUpdateTime > maxIdleTime * 1000) { >> >> return new Watermark(new Date().getTime() - >> finalMaxOutOfOrderness * 1000); >> >> } >> >> return new Watermark(currentMaxTimestamp - >> finalMaxOutOfOrderness * 1000); >> >> >> >> } >> >> >> >> @Override >> >> public long extractTimestamp(Row row, long >> previousElementTimestamp) { >> >> Object value = row.getField(1); >> >> long timestamp; >> >> try { >> >> timestamp = (long)value; >> >> } catch (Exception e) { >> >> timestamp = ((Timestamp)value).getTime(); >> >> } >> >> if(timestamp > currentMaxTimestamp) { >> >> currentMaxTimestamp = timestamp; >> >> } >> >> return timestamp; >> >> } >> >> }; >> >> return timestampExtractor; >> >> } >> >> >> >> public static void main(String[] args) throws Exception { >> >> StreamExecutionEnvironment bsEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> >> StreamTableEnvironment bsTableEnv = >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> bsEnv.setParallelism(1); >> >> >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> >> >> >> >> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000)); >> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd >> HH:mm:ss"); >> >> List<Row> list = new ArrayList<>(); >> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 >> 00:00:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 00:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 00:40:00").getTime()), 100)); >> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 >> 01:00:01").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:30:00").getTime()), 100)); >> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 >> 02:00:02").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:40:00").getTime()), 100)); >> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 >> 03:00:03").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 03:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 03:40:00").getTime()), 100)); >> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 >> 04:00:04").getTime()), 100)); >> >> DataStream<Row> ds1 = bsEnv.addSource(new >> SourceFunction<Row>() { >> >> @Override >> >> public void run(SourceContext<Row> ctx) throws Exception { >> >> for(Row row : list) { >> >> ctx.collect(row); >> >> Thread.sleep(1000); >> >> } >> >> >> >> } >> >> >> >> @Override >> >> public void cancel() { >> >> >> >> } >> >> }); >> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0)); >> >> ds1.getTransformation().setOutputType((new >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); >> >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id, >> order_time, fee, rowtime.rowtime"); >> >> >> >> List<Row> list2 = new ArrayList<>(); >> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 >> 01:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 01:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 01:30:00").getTime()))); >> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 >> 02:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 02:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 02:40:00").getTime()))); >> >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 >> 03:00:03").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 03:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 03:40:00").getTime()))); >> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 >> 04:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 04:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 04:40:00").getTime()))); >> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 >> 05:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 05:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 05:40:00").getTime()))); >> >> DataStream<Row> ds2 = bsEnv.addSource(new >> SourceFunction<Row>() { >> >> @Override >> >> public void run(SourceContext<Row> ctx) throws Exception { >> >> for(Row row : list2) { >> >> ctx.collect(row); >> >> Thread.sleep(1000); >> >> } >> >> >> >> } >> >> >> >> @Override >> >> public void cancel() { >> >> >> >> } >> >> }); >> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0)); >> >> ds2.getTransformation().setOutputType((new >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); >> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id, >> pay_time, rowtime.rowtime"); >> >> >> >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id >> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id >> <>'000' "); >> >> >> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new >> ProcessFunction<Row, Object>() { >> >> @Override >> >> public void processElement(Row value, Context ctx, >> Collector<Object> out) throws Exception { >> >> SimpleDateFormat sdf = new >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); >> >> System.err.println("row:" + value + ",rowtime:" + >> value.getField(3) + ",watermark:" + >> sdf.format(ctx.timerService().currentWatermark())); >> >> } >> >> }); >> >> >> >> bsTableEnv.execute("job"); >> >> } >> >> } >> >> >> >> >> > >> > -- >> > >> > Best, >> > Benchao Li >> > >> >> >> -- >> >> Best, >> Benchao Li >> > > > -- > ************************************** > tivanli > ************************************** > -- ************************************** tivanli **************************************
