按照这个说法,那么timestamp和watermark其实没有关系。 但是我看到有关帖子里说:双流join里存储的mapstate<Long,StreamRecord>。 而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。 具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。 @OverridepublicvoidprocessElement(StreamRecord<T> element)throws Exception { finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 用新Timestamp 替换StreamRecord中的旧Timestamp output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } } 以上是我看见的帖子中的相关内容 如果上述说法不对的话,那么在双流join中,watermark是怎么流转的?
在 2022-07-07 10:03:00,"yidan zhao" <hinobl...@gmail.com> 写道: >timestamp是为每个element(输入的记录)赋值的一个时间戳。 >watermark是从source部分生成的水印个,然后向后传播。 > >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。 >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。 > >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。 > >lxk <lxk7...@163.com> 于2022年7月6日周三 13:36写道: >> >> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下 >> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png >> 官方文档中说会从两个流中的 timestamp 中取最大值,看了下源码确实是这样 >> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png >> 我的问题是: >> 1.这里的timestamp和watermark有什么区别? >> 2.interval >> join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系