两个stream输入的场景,operator的watermark是取两者的最小值。
所以虽然这条数据在第一个流里面看起来已经是肯定迟到了,但是有可能看第二个流的watermark它还没有过期。


忝忝向仧 <[email protected]> 于2020年4月4日周六 下午10:42写道:

> Hi:
> &nbsp; &nbsp; 还有个疑问,我特意构造了个乱序的消息,还是3秒一个窗口
> stream1:
> 1 tom1 1553503185000
> 1 tom2 1553503186000
> 1 tom3 1553503187000
> 1 tom4 1553503188000
> 1 tom_late 1553503185000
>
>
>
> stream2:
> 1 jerry1 1553503185000
> 1 jerry2 1553503186000
> 1 jerry3 1553503187000
> 1 jerry4 1553503188000
>
>
> 我代码还是之前的代码,定义水印都是maxDelayAllowed = 0L;也就是没有设置最大延时时间.
> 那么,tom_late这条乱序,在第一个窗口[1553503185000,1553503188000)内不应该被输出吧?
> 但是结果还是输出了,这个是为什么?
>
>
> currentTimeStamp: 1553503185000,Key:1,EventTime:1553503185000,前一条数据的水位线:0
> 4&gt; (1,tom1,1553503185000)
> currentTimeStamp:
> 1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:1553503185000
> 4&gt; (1,tom2,1553503186000)
> currentTimeStamp:
> 1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000
> 4&gt; (1,tom3,1553503187000)
> currentTimeStamp:
> 1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000
> 4&gt; (1,tom4,1553503188000)
> currentTimeStamp:
> 1553503188000,Key:1,EventTime:1553503185000,前一条数据的水位线:1553503188000
> 4&gt; (1,tom_late,1553503185000)
> currentTimeStamp:
> 1553503189000,Key:2,EventTime:1553503189000,前一条数据的水位线:1553503188000
> 4&gt; (2,tom5,1553503189000)
> currentTimeStamp:
> 1553503191000,Key:2,EventTime:1553503191000,前一条数据的水位线:1553503189000
> 4&gt; (2,tom6,1553503191000)
> currentTimeStamp:
> 1553503192000,Key:2,EventTime:1553503192000,前一条数据的水位线:1553503191000
> 4&gt; (2,tom7,1553503192000)
> currentTimeStamp:
> 1553503193000,Key:2,EventTime:1553503193000,前一条数据的水位线:1553503192000
> 4&gt; (2,tom8,1553503193000)
> currentTimeStamp: 1553503185000,Key:1,EventTime:1553503185000,前一条数据的水位线:0
> 3&gt; (1,jerry1,1553503185000)
> currentTimeStamp:
> 1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:1553503185000
> 3&gt; (1,jerry2,1553503186000)
> currentTimeStamp:
> 1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000
> 3&gt; (1,jerry3,1553503187000)
> currentTimeStamp:
> 1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000
> 3&gt; (1,jerry4,1553503188000)
> 2&gt; tom1=jerry1
> 2&gt; tom1=jerry2
> 2&gt; tom1=jerry3
> 2&gt; tom2=jerry1
> 2&gt; tom2=jerry2
> 2&gt; tom2=jerry3
> 2&gt; tom3=jerry1
> 2&gt; tom3=jerry2
> 2&gt; tom3=jerry3
> 2&gt; tom_late=jerry1
> 2&gt; tom_late=jerry2
> 2&gt; tom_late=jerry3
>
>
>
>
>
>
>
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Djeng Lee"<[email protected]&gt;;
> 发送时间:&nbsp;2020年4月4日(星期六) 晚上6:35
> 收件人:&nbsp;"[email protected]"<[email protected]&gt;;
>
> 主题:&nbsp;Re: 回复: 回复: Flink双流Join问题
>
>
>
> 刚刚我说的不严谨
>
> Start = 1000000055000 - (1000000055000 - 0 + 3000) % 3000 = 1000000053000
> End = 1000000053000 + 3000
>
> //源码位置,所以窗口开端并不是你传入首条记录的作为开端。窗口划分是从0时间戳切过来的。
>
> 文档说明:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamps_watermarks.html
>
> 相关代码
> public static long getWindowStartWithOffset(long timestamp, long offset,
> long windowSize) {
> &nbsp;  return timestamp - (timestamp - offset + windowSize) % windowSize;
> }
>
> 在 2020/4/4 下午6:30,“忝忝向仧”<[email protected]&gt; 写入:
>
> &nbsp;&nbsp;&nbsp; 额,你的意思是滚动3秒的窗口开始和结束应该是
> &nbsp;&nbsp;&nbsp; 1000000055000 % 3 得出结果再拿到[start,end).
> &nbsp;&nbsp;&nbsp; 比如1000000055000 % 3
> 的结果是1000000053000,那么窗口是[1000000053000,1000000056000)
> &nbsp;&nbsp;&nbsp; 是这么理解吧
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &nbsp;&nbsp;&nbsp; 发件人:&amp;nbsp;"lee.roval"<[email protected]&amp;gt;;
> &nbsp;&nbsp;&nbsp; 发送时间:&amp;nbsp;2020年4月4日(星期六) 晚上6:25
> &nbsp;&nbsp;&nbsp; 收件人:&amp;nbsp;"[email protected]"<
> [email protected]&amp;gt;;
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp; 主题:&amp;nbsp;Re: 回复: Flink双流Join问题
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp; 55000的窗口分配,是对windowSize 求模然后拿到start 和 end。 不是从你首条记录开始算。
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp; 在 2020/4/4 下午6:23,“忝忝向仧”<[email protected]&amp;gt; 写入:
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 对,我只是回复把前面那串省略了,没写.
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> [1000000053000,1000000056000),为什么是1000000053000开始?
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 我第一条输入的是
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 1,tom1,1000000055000
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> 发件人:&amp;amp;nbsp;"libenchao"<[email protected]&amp;amp;gt;;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> 发送时间:&amp;amp;nbsp;2020年4月4日(星期六) 晚上6:20
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> 收件人:&amp;amp;nbsp;"user-zh"<[email protected]&amp;amp;gt;;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 主题:&amp;amp;nbsp;Re:
> Flink双流Join问题
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> 你的watermark不是56000,而是1000000056000吧。所以应该是[1000000053000,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 1000000056000)是一个窗口吧。
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 忝忝向仧 
> <[email protected]&amp;amp;gt;
> 于2020年4月4日周六 下午6:16写道:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 下发新的?
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 4是一个kafka的source,3是另外一个kafka的source.
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 如果按照3秒的一个窗口
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> watermark触发窗口的条件是watermark_time&amp;amp;amp;gt;=window_endtime
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 也就是说[55000,57000)应该是一个窗口的.
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 我是这么理解的,但是结果56000后就输出了
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentTimeStamp:&amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 4&amp;amp;amp;gt; (1,tom1,1000000055000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentTimeStamp:&amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 4&amp;amp;amp;gt; (1,tom2,1000000056000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentTimeStamp:&amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 3&amp;amp;amp;gt; (1,jerry1,1000000055000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentTimeStamp:&amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 3&amp;amp;amp;gt; (1,jerry2,1000000056000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 2&amp;amp;amp;gt; tom1=jerry1
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 发件人:&amp;amp;amp;nbsp;"lee.roval"<[email protected]&amp;amp;amp;gt;;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 发送时间:&amp;amp;amp;nbsp;2020年4月4日(星期六) 晚上6:10
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 收件人:&amp;amp;amp;nbsp;"[email protected]"<[email protected]
> &amp;amp;amp;gt;;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 主题:&amp;amp;amp;nbsp;Re: Flink双流Join问题
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 56000后不是下发新的watermark了嘛
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 在 2020/4/4
> 下午5:57,“忝忝向仧”<[email protected]&amp;amp;amp;gt; 写入:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 各位好:&amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp;
> &amp;amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> Flink双流Join遇到一个问题,能否解释下,谢谢.
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp;
> &amp;amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> ds1和ds2分别读取kafka两个流数据,使用event time和watermark特性,3s的一个翻滚窗口,定义如下:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp;
> &amp;amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 最后,join输出的时候,为什么触发窗口的数据第二条就触发了?
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp;
> &amp;amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 按照水印的触发条件应该是watermark_time&amp;amp;amp;amp;gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么?
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 定义的代码如下:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> DataStream<String&amp;amp;amp;amp;gt; stream1 = env
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> .addSource(new FlinkKafkaConsumer09<String&amp;amp;amp;amp;gt;("stream1",
> new
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> SimpleStringSchema(), properties).setStartFromLatest())
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> .assignTimestampsAndWatermarks(
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; new
> AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;gt;() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> currentTimeStamp = 0L;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> maxDelayAllowed = 0L;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> currentWaterMark;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Nullable
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public
> Watermark getCurrentWatermark() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentWaterMark = currentTimeStamp-maxDelayAllowed;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return new
> Watermark(currentWaterMark);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public long
> extractTimestamp(String s, long l) {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[]
> arr= s.split(" ");
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> timeStamp = Long.parseLong(arr[2]);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> System.out.println("currentTimeStamp: " +&amp;amp;amp;nbsp;
> currentTimeStamp +",Key:" +
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; arr[0] +
> ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return
> timeStamp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> );
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> DataStream<Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; ds1 =
> stream1.map(new MapFunction<String,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> public Tuple3<String,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> String,String&amp;amp;amp;amp;gt; map(String s1) throws Exception {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[]
> arr = s1.split(" ");
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> return
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> Tuple3.of(arr[0],arr[1],arr[2]);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; });
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ds1.print();
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> DataStream<String&amp;amp;amp;amp;gt; stream2 = env
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> .addSource(new FlinkKafkaConsumer09<String&amp;amp;amp;amp;gt;("stream2",
> new
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> SimpleStringSchema(), properties).setStartFromLatest())
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> .assignTimestampsAndWatermarks(
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; new
> AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;gt;() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> currentTimeStamp = 0L;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> maxDelayAllowed = 0L;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> currentWaterMark;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Nullable
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public
> Watermark getCurrentWatermark() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentWaterMark = currentTimeStamp-maxDelayAllowed;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return new
> Watermark(currentWaterMark);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public long
> extractTimestamp(String s, long l) {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[]
> arr= s.split(" ");
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long
> timeStamp = Long.parseLong(arr[2]);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> System.out.println("currentTimeStamp: " +&amp;amp;amp;nbsp;
> currentTimeStamp +",Key:" +
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; arr[0] +
> ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return
> timeStamp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> );
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> DataStream<Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; ds2
> =stream2.map(new MapFunction<String,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> public
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> Tuple3<String,String,String&amp;amp;amp;amp;gt; map(String s2) throws
> Exception {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> String
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; [] arr =
> s2.split(" ");
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> return
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> Tuple3.of(arr[0],arr[1],arr[2]);
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; });
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ds2.print();
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> ds1.join(ds2).where(new KeySelector<Tuple3<String,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> String,String&amp;amp;amp;amp;gt;,String&amp;amp;amp;amp;gt;() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> public String
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> getKey(Tuple3<String, String,String&amp;amp;amp;amp;gt; value) throws
> Exception {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> return
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value.f0;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; }).equalTo(new
> KeySelector<Tuple3<String, String
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> ,String&amp;amp;amp;amp;gt;, String&amp;amp;amp;amp;gt;() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> public String
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> getKey(Tuple3<String, String,String&amp;amp;amp;amp;gt; value) throws
> Exception {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> return
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value.f0;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; })
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> .window(TumblingEventTimeWindows.of(Time.seconds(3)))
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; .apply(new
> JoinFunction<Tuple3<String,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> String,String&amp;amp;amp;amp;gt;, Tuple3<String,
> String,String&amp;amp;amp;amp;gt;,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> String&amp;amp;amp;amp;gt;() {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> @Override
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> public String
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> join(Tuple3<String, String,String&amp;amp;amp;amp;gt; value1, Tuple3<String,
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> String,String&amp;amp;amp;amp;gt; value2) throws Exception {
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> return
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value1.f1 +
> "=" + value2.f1;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> }
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; }).print();
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 结果如下:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 4&amp;amp;amp;amp;gt; (1,tom1,1000000055000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 4&amp;amp;amp;amp;gt; (1,tom2,1000000056000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 3&amp;amp;amp;amp;gt; (1,jerry1,1000000055000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 3&amp;amp;amp;amp;gt; (1,jerry2,1000000056000)
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 2&amp;amp;amp;amp;gt; tom1=jerry1
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; --
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp;
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; Benchao Li
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; School of Electronics
> Engineering and Computer Science, Peking University
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; Tel:+86-15650713730
> &nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; Email:
> [email protected]; [email protected]



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]

回复