嗯,可以这么理解。 忝忝向仧 <[email protected]> 于2020年4月4日周六 下午11:20写道:
> 额,明白了,意思是说两个流情况下 > 比如,stream1里面晚来的那条 > 1 tom_late 1553503185000的水印是1553503188000 > 但是stream2里面,这条1,jerry1,1553503185000的水印是1553503185000 > 所以取最小的,因此还是会被打印? > 是这么理解么? > > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "libenchao"<[email protected]>; > 发送时间: 2020年4月4日(星期六) 晚上11:04 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: 回复: 回复: Flink双流Join问题 > > > > 两个stream输入的场景,operator的watermark是取两者的最小值。 > 所以虽然这条数据在第一个流里面看起来已经是肯定迟到了,但是有可能看第二个流的watermark它还没有过期。 > > > 忝忝向仧 <[email protected]> 于2020年4月4日周六 下午10:42写道: > > > Hi: > > &nbsp; &nbsp; 还有个疑问,我特意构造了个乱序的消息,还是3秒一个窗口 > > stream1: > > 1 tom1 1553503185000 > > 1 tom2 1553503186000 > > 1 tom3 1553503187000 > > 1 tom4 1553503188000 > > 1 tom_late 1553503185000 > > > > > > > > stream2: > > 1 jerry1 1553503185000 > > 1 jerry2 1553503186000 > > 1 jerry3 1553503187000 > > 1 jerry4 1553503188000 > > > > > > 我代码还是之前的代码,定义水印都是maxDelayAllowed = 0L;也就是没有设置最大延时时间. > > 那么,tom_late这条乱序,在第一个窗口[1553503185000,1553503188000)内不应该被输出吧? > > 但是结果还是输出了,这个是为什么? > > > > > > currentTimeStamp: > 1553503185000,Key:1,EventTime:1553503185000,前一条数据的水位线:0 > > 4&gt; (1,tom1,1553503185000) > > currentTimeStamp: > > 1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:1553503185000 > > 4&gt; (1,tom2,1553503186000) > > currentTimeStamp: > > 1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000 > > 4&gt; (1,tom3,1553503187000) > > currentTimeStamp: > > 1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000 > > 4&gt; (1,tom4,1553503188000) > > currentTimeStamp: > > 1553503188000,Key:1,EventTime:1553503185000,前一条数据的水位线:1553503188000 > > 4&gt; (1,tom_late,1553503185000) > > currentTimeStamp: > > 1553503189000,Key:2,EventTime:1553503189000,前一条数据的水位线:1553503188000 > > 4&gt; (2,tom5,1553503189000) > > currentTimeStamp: > > 1553503191000,Key:2,EventTime:1553503191000,前一条数据的水位线:1553503189000 > > 4&gt; (2,tom6,1553503191000) > > currentTimeStamp: > > 1553503192000,Key:2,EventTime:1553503192000,前一条数据的水位线:1553503191000 > > 4&gt; (2,tom7,1553503192000) > > currentTimeStamp: > > 1553503193000,Key:2,EventTime:1553503193000,前一条数据的水位线:1553503192000 > > 4&gt; (2,tom8,1553503193000) > > currentTimeStamp: > 1553503185000,Key:1,EventTime:1553503185000,前一条数据的水位线:0 > > 3&gt; (1,jerry1,1553503185000) > > currentTimeStamp: > > 1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:1553503185000 > > 3&gt; (1,jerry2,1553503186000) > > currentTimeStamp: > > 1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000 > > 3&gt; (1,jerry3,1553503187000) > > currentTimeStamp: > > 1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000 > > 3&gt; (1,jerry4,1553503188000) > > 2&gt; tom1=jerry1 > > 2&gt; tom1=jerry2 > > 2&gt; tom1=jerry3 > > 2&gt; tom2=jerry1 > > 2&gt; tom2=jerry2 > > 2&gt; tom2=jerry3 > > 2&gt; tom3=jerry1 > > 2&gt; tom3=jerry2 > > 2&gt; tom3=jerry3 > > 2&gt; tom_late=jerry1 > > 2&gt; tom_late=jerry2 > > 2&gt; tom_late=jerry3 > > > > > > > > > > > > > > > > > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Djeng Lee"<[email protected]&gt;; > > 发送时间:&nbsp;2020年4月4日(星期六) 晚上6:35 > > 收件人:&nbsp;"[email protected]"<[email protected] > &gt;; > > > > 主题:&nbsp;Re: 回复: 回复: Flink双流Join问题 > > > > > > > > 刚刚我说的不严谨 > > > > Start = 1000000055000 - (1000000055000 - 0 + 3000) % 3000 = > 1000000053000 > > End = 1000000053000 + 3000 > > > > //源码位置,所以窗口开端并不是你传入首条记录的作为开端。窗口划分是从0时间戳切过来的。 > > > > 文档说明: > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamps_watermarks.html > > > > 相关代码 > > public static long getWindowStartWithOffset(long timestamp, long > offset, > > long windowSize) { > > &nbsp; return timestamp - (timestamp - offset + windowSize) > % windowSize; > > } > > > > 在 2020/4/4 下午6:30,“忝忝向仧”<[email protected]&gt; 写入: > > > > &nbsp;&nbsp;&nbsp; 额,你的意思是滚动3秒的窗口开始和结束应该是 > > &nbsp;&nbsp;&nbsp; 1000000055000 % 3 得出结果再拿到[start,end). > > &nbsp;&nbsp;&nbsp; 比如1000000055000 % 3 > > 的结果是1000000053000,那么窗口是[1000000053000,1000000056000) > > &nbsp;&nbsp;&nbsp; 是这么理解吧 > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; > > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > > &nbsp;&nbsp;&nbsp; 发件人:&amp;nbsp;"lee.roval"< > [email protected]&amp;gt;; > > &nbsp;&nbsp;&nbsp; 发送时间:&amp;nbsp;2020年4月4日(星期六) > 晚上6:25 > > &nbsp;&nbsp;&nbsp; 收件人:&amp;nbsp;" > [email protected]"< > > [email protected]&amp;gt;; > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; 主题:&amp;nbsp;Re: 回复: Flink双流Join问题 > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; 55000的窗口分配,是对windowSize 求模然后拿到start 和 > end。 不是从你首条记录开始算。 > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; 在 2020/4/4 下午6:23,“忝忝向仧”< > [email protected]&amp;gt; 写入: > > &nbsp;&nbsp;&nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; 对,我只是回复把前面那串省略了,没写. > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > [1000000053000,1000000056000),为什么是1000000053000开始? > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; 我第一条输入的是 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; 1,tom1,1000000055000 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > > ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------ > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > 发件人:&amp;amp;nbsp;"libenchao"<[email protected] > &amp;amp;gt;; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > 发送时间:&amp;amp;nbsp;2020年4月4日(星期六) 晚上6:20 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > 收件人:&amp;amp;nbsp;"user-zh"<[email protected] > &amp;amp;gt;; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; 主题:&amp;amp;nbsp;Re: > > Flink双流Join问题 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > 你的watermark不是56000,而是1000000056000吧。所以应该是[1000000053000, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; 1000000056000)是一个窗口吧。 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; 忝忝向仧 <[email protected] > &amp;amp;gt; > > 于2020年4月4日周六 下午6:16写道: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 下发新的? > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 4是一个kafka的source,3是另外一个kafka的source. > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 如果按照3秒的一个窗口 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > watermark触发窗口的条件是watermark_time&amp;amp;amp;gt;=window_endtime > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 也就是说[55000,57000)应该是一个窗口的. > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 我是这么理解的,但是结果56000后就输出了 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > currentTimeStamp:&amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 4&amp;amp;amp;gt; (1,tom1,1000000055000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > currentTimeStamp:&amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 4&amp;amp;amp;gt; (1,tom2,1000000056000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > currentTimeStamp:&amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 3&amp;amp;amp;gt; (1,jerry1,1000000055000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > currentTimeStamp:&amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 3&amp;amp;amp;gt; (1,jerry2,1000000056000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 2&amp;amp;amp;gt; tom1=jerry1 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------ > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 发件人:&amp;amp;amp;nbsp;"lee.roval"<[email protected] > &amp;amp;amp;gt;; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 发送时间:&amp;amp;amp;nbsp;2020年4月4日(星期六) 晚上6:10 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 收件人:&amp;amp;amp;nbsp;"[email protected]"< > [email protected] > > &amp;amp;amp;gt;; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 主题:&amp;amp;amp;nbsp;Re: Flink双流Join问题 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 56000后不是下发新的watermark了嘛 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 在 2020/4/4 > > 下午5:57,“忝忝向仧”<[email protected]&amp;amp;amp;gt; 写入: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > 各位好:&amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; > &amp;amp;amp;amp;nbsp; > > &amp;amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > Flink双流Join遇到一个问题,能否解释下,谢谢. > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; > &amp;amp;amp;amp;nbsp; > > &amp;amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > ds1和ds2分别读取kafka两个流数据,使用event time和watermark特性,3s的一个翻滚窗口,定义如下: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; > &amp;amp;amp;amp;nbsp; > > &amp;amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 最后,join输出的时候,为什么触发窗口的数据第二条就触发了? > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; > &amp;amp;amp;amp;nbsp; > > &amp;amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > 按照水印的触发条件应该是watermark_time&amp;amp;amp;amp;gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么? > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > 定义的代码如下: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > DataStream<String&amp;amp;amp;amp;gt; stream1 = env > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > .addSource(new > FlinkKafkaConsumer09<String&amp;amp;amp;amp;gt;("stream1", > > new > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > SimpleStringSchema(), properties).setStartFromLatest()) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > .assignTimestampsAndWatermarks( > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; new > > AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;gt;() { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > currentTimeStamp = 0L; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > maxDelayAllowed = 0L; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > currentWaterMark; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Nullable > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public > > Watermark getCurrentWatermark() { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > currentWaterMark = currentTimeStamp-maxDelayAllowed; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return new > > Watermark(currentWaterMark); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public long > > extractTimestamp(String s, long l) { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[] > > arr= s.split(" "); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > timeStamp = Long.parseLong(arr[2]); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > currentTimeStamp = Math.max(timeStamp, currentTimeStamp); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > System.out.println("currentTimeStamp: " +&amp;amp;amp;nbsp; > > currentTimeStamp +",Key:" + > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; arr[0] + > > ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return > > timeStamp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > ); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > > DataStream<Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; ds1 = > > stream1.map(new MapFunction<String, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;() > { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > public Tuple3<String, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > String,String&amp;amp;amp;amp;gt; map(String s1) throws Exception > { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[] > > arr = s1.split(" "); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > return > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > Tuple3.of(arr[0],arr[1],arr[2]); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; }); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > ds1.print(); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > DataStream<String&amp;amp;amp;amp;gt; stream2 = env > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > .addSource(new > FlinkKafkaConsumer09<String&amp;amp;amp;amp;gt;("stream2", > > new > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > SimpleStringSchema(), properties).setStartFromLatest()) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > .assignTimestampsAndWatermarks( > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; new > > AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;gt;() { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > currentTimeStamp = 0L; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > maxDelayAllowed = 0L; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > currentWaterMark; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Nullable > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public > > Watermark getCurrentWatermark() { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > currentWaterMark = currentTimeStamp-maxDelayAllowed; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return new > > Watermark(currentWaterMark); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public long > > extractTimestamp(String s, long l) { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[] > > arr= s.split(" "); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long > > timeStamp = Long.parseLong(arr[2]); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > currentTimeStamp = Math.max(timeStamp, currentTimeStamp); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > System.out.println("currentTimeStamp: " +&amp;amp;amp;nbsp; > > currentTimeStamp +",Key:" + > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; arr[0] + > > ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return > > timeStamp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > ); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > > DataStream<Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; ds2 > > =stream2.map(new MapFunction<String, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;() > { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > public > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > Tuple3<String,String,String&amp;amp;amp;amp;gt; map(String s2) > throws > > Exception { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > String > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; [] arr = > > s2.split(" "); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > return > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > Tuple3.of(arr[0],arr[1],arr[2]); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; }); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > ds2.print(); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > ds1.join(ds2).where(new KeySelector<Tuple3<String, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > String,String&amp;amp;amp;amp;gt;,String&amp;amp;amp;amp;gt;() { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > public String > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > getKey(Tuple3<String, String,String&amp;amp;amp;amp;gt; value) > throws > > Exception { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > return > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value.f0; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > }).equalTo(new > > KeySelector<Tuple3<String, String > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > ,String&amp;amp;amp;amp;gt;, String&amp;amp;amp;amp;gt;() { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > public String > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > getKey(Tuple3<String, String,String&amp;amp;amp;amp;gt; value) > throws > > Exception { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > return > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value.f0; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; }) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > .window(TumblingEventTimeWindows.of(Time.seconds(3))) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > .apply(new > > JoinFunction<Tuple3<String, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > String,String&amp;amp;amp;amp;gt;, Tuple3<String, > > String,String&amp;amp;amp;amp;gt;, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > String&amp;amp;amp;amp;gt;() { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > @Override > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > public String > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > join(Tuple3<String, String,String&amp;amp;amp;amp;gt; value1, > Tuple3<String, > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > String,String&amp;amp;amp;amp;gt; value2) throws Exception { > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > return > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value1.f1 + > > "=" + value2.f1; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > } > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > }).print(); > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > 结果如下: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > currentTimeStamp: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > 4&amp;amp;amp;amp;gt; (1,tom1,1000000055000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > currentTimeStamp: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > 4&amp;amp;amp;amp;gt; (1,tom2,1000000056000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > currentTimeStamp: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > 3&amp;amp;amp;amp;gt; (1,jerry1,1000000055000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > currentTimeStamp: > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > 3&amp;amp;amp;amp;gt; (1,jerry2,1000000056000) > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; > > &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; > > 2&amp;amp;amp;amp;gt; tom1=jerry1 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; -- > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; Benchao Li > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; School of Electronics > > Engineering and Computer Science, Peking University > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; Tel:+86-15650713730 > > &nbsp;&nbsp;&nbsp; > &amp;nbsp;&amp;nbsp;&amp;nbsp; Email: > > [email protected]; [email protected] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [email protected]; [email protected] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [email protected]; [email protected]
