你代码没格式化,不方便看。 首先我们讨论的都是eventtime场景,processtime场景下watermark没用。
假设Record代表一次请求信息,那么可以把该请求的发生时间作为该Record的eventime,也就是这个Record的timestamp。 现在要统计每5分钟的pv信息,就需要依据这个Record的timestamp决定划分到哪个窗口,比如划分到了 yyyyMMdd 8:30 ~ yyyyMMdd 8:35 这个窗口。 考虑到数据可以迟到,任何时刻,比如即使到了 8.40分,也可能再出现 8.33 分的数据。 但是,flink不可能无限等待下午,8.35的窗口必须在某个时机闭合并输出该窗口的统计结果到下游。 如果采用 EventTimeTrigger 的话,这个决定闭合窗口的时机就是:当watermark达到窗口的maxTimestamp。该窗口的maxTimestamp就是 8.35 分那个点。 watermark则是根据用户选择的策略生成,比如在source部分,根据当前task看到的最大的Record的timestamp,减去一个 maxOutOfOrderness 即为 watermark。这个 maxOutOfOrderness 就是允许数据乱序的程度。 至于watermark的传播,简单说就是向后广播即可。 双流 join 情况的话,需要取小,取2个watermark的更小的那个。 对于从同一个流进入的watermark是取大(这个其实逻辑正确的话,生成端就决定了递增了,接收端做个判定只是保险,避免出现watermark倒退而已)。 lxk <lxk7...@163.com> 于2022年7月7日周四 19:57写道: > > 按照这个说法,那么timestamp和watermark其实没有关系。 > 但是我看到有关帖子里说:双流join里存储的mapstate<Long,StreamRecord>。 > 而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord > 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。 > 具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。 > @OverridepublicvoidprocessElement(StreamRecord<T> element)throws Exception { > finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp > 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, > element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // > 用新Timestamp 替换StreamRecord中的旧Timestamp > output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 > 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= > userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark > 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && > nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = > nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } } > 以上是我看见的帖子中的相关内容 > 如果上述说法不对的话,那么在双流join中,watermark是怎么流转的? > > > > > > > > > > > > > > > 在 2022-07-07 10:03:00,"yidan zhao" <hinobl...@gmail.com> 写道: > >timestamp是为每个element(输入的记录)赋值的一个时间戳。 > >watermark是从source部分生成的水印个,然后向后传播。 > > > >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。 > >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。 > > > >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。 > > > >lxk <lxk7...@163.com> 于2022年7月6日周三 13:36写道: > >> > >> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下 > >> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png > >> 官方文档中说会从两个流中的 timestamp 中取最大值,看了下源码确实是这样 > >> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png > >> 我的问题是: > >> 1.这里的timestamp和watermark有什么区别? > >> 2.interval > >> join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系 > >