对,我只是回复把前面那串省略了,没写. [1000000053000,1000000056000),为什么是1000000053000开始? 我第一条输入的是 1,tom1,1000000055000
------------------ 原始邮件 ------------------ 发件人: "libenchao"<[email protected]>; 发送时间: 2020年4月4日(星期六) 晚上6:20 收件人: "user-zh"<[email protected]>; 主题: Re: Flink双流Join问题 你的watermark不是56000,而是1000000056000吧。所以应该是[1000000053000, 1000000056000)是一个窗口吧。 忝忝向仧 <[email protected]> 于2020年4月4日周六 下午6:16写道: > 下发新的? > > > 4是一个kafka的source,3是另外一个kafka的source. > 如果按照3秒的一个窗口 > watermark触发窗口的条件是watermark_time&gt;=window_endtime > > > 也就是说[55000,57000)应该是一个窗口的. > > > 我是这么理解的,但是结果56000后就输出了 > > > > currentTimeStamp:&nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > 4&gt; (1,tom1,1000000055000) > > currentTimeStamp:&nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > 4&gt; (1,tom2,1000000056000) > > currentTimeStamp:&nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > 3&gt; (1,jerry1,1000000055000) > > currentTimeStamp:&nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > 3&gt; (1,jerry2,1000000056000) > 2&gt; tom1=jerry1 > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"lee.roval"<[email protected]&gt;; > 发送时间:&nbsp;2020年4月4日(星期六) 晚上6:10 > 收件人:&nbsp;"[email protected]"<[email protected]&gt;; > > 主题:&nbsp;Re: Flink双流Join问题 > > > > 56000后不是下发新的watermark了嘛 > > 在 2020/4/4 下午5:57,“忝忝向仧”<[email protected]&gt; 写入: > > &nbsp;&nbsp;&nbsp; 各位好:&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; > Flink双流Join遇到一个问题,能否解释下,谢谢. > &nbsp;&nbsp;&nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; > ds1和ds2分别读取kafka两个流数据,使用event time和watermark特性,3s的一个翻滚窗口,定义如下: > &nbsp;&nbsp;&nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; > 最后,join输出的时候,为什么触发窗口的数据第二条就触发了? > &nbsp;&nbsp;&nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; > 按照水印的触发条件应该是watermark_time&amp;gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么? > &nbsp;&nbsp;&nbsp; > &nbsp;&nbsp;&nbsp; > &nbsp;&nbsp;&nbsp; > &nbsp;&nbsp;&nbsp; 定义的代码如下: > &nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream1 = env > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .addSource(new FlinkKafkaConsumer09<String&amp;gt;("stream1", new > SimpleStringSchema(), properties).setStartFromLatest()) > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .assignTimestampsAndWatermarks( > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > new AssignerWithPeriodicWatermarks<String&amp;gt;() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long currentTimeStamp = 0L; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long maxDelayAllowed = 0L; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long currentWaterMark; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > @Nullable > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > public Watermark getCurrentWatermark() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > currentWaterMark = currentTimeStamp-maxDelayAllowed; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return new Watermark(currentWaterMark); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > public long extractTimestamp(String s, long l) { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > String[] arr= s.split(" "); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long timeStamp = Long.parseLong(arr[2]); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > currentTimeStamp = Math.max(timeStamp, currentTimeStamp); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > System.out.println("currentTimeStamp: " +&nbsp; currentTimeStamp +",Key:" + > arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return timeStamp; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ); > &nbsp;&nbsp;&nbsp; > &nbsp;&nbsp;&nbsp; DataStream<Tuple3<String,String,String&amp;gt;&amp;gt; > ds1 = stream1.map(new MapFunction<String, > Tuple3<String,String,String&amp;gt;&amp;gt;() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public Tuple3<String, > String,String&amp;gt; map(String s1) throws Exception { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > String[] arr = s1.split(" "); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > Tuple3.of(arr[0],arr[1],arr[2]); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &nbsp;&nbsp;&nbsp; }); > &nbsp;&nbsp;&nbsp; > &nbsp;&nbsp;&nbsp; ds1.print(); > &nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream2 = env > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .addSource(new FlinkKafkaConsumer09<String&amp;gt;("stream2", new > SimpleStringSchema(), properties).setStartFromLatest()) > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .assignTimestampsAndWatermarks( > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > new AssignerWithPeriodicWatermarks<String&amp;gt;() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long currentTimeStamp = 0L; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long maxDelayAllowed = 0L; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long currentWaterMark; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > @Nullable > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > public Watermark getCurrentWatermark() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > currentWaterMark = currentTimeStamp-maxDelayAllowed; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return new Watermark(currentWaterMark); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > public long extractTimestamp(String s, long l) { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > String[] arr= s.split(" "); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > long timeStamp = Long.parseLong(arr[2]); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > currentTimeStamp = Math.max(timeStamp, currentTimeStamp); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > System.out.println("currentTimeStamp: " +&nbsp; currentTimeStamp +",Key:" + > arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return timeStamp; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > } > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ); > &nbsp;&nbsp;&nbsp; > &nbsp;&nbsp;&nbsp; DataStream<Tuple3<String,String,String&amp;gt;&amp;gt; > ds2 =stream2.map(new MapFunction<String, > Tuple3<String,String,String&amp;gt;&amp;gt;() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public > Tuple3<String,String,String&amp;gt; map(String s2) throws Exception { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String > [] arr = s2.split(" "); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > Tuple3.of(arr[0],arr[1],arr[2]); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &nbsp;&nbsp;&nbsp; }); > &nbsp;&nbsp;&nbsp; ds2.print(); > &nbsp;&nbsp;&nbsp; ds1.join(ds2).where(new KeySelector<Tuple3<String, > String,String&amp;gt;,String&amp;gt;() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public String > getKey(Tuple3<String, String,String&amp;gt; value) throws Exception { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > value.f0; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &nbsp;&nbsp;&nbsp; }).equalTo(new KeySelector<Tuple3<String, String > ,String&amp;gt;, String&amp;gt;() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public String > getKey(Tuple3<String, String,String&amp;gt; value) throws Exception { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > value.f0; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &nbsp;&nbsp;&nbsp; }) > &nbsp;&nbsp;&nbsp; .window(TumblingEventTimeWindows.of(Time.seconds(3))) > &nbsp;&nbsp;&nbsp; .apply(new JoinFunction<Tuple3<String, > String,String&amp;gt;, Tuple3<String, String,String&amp;gt;, > String&amp;gt;() { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public String > join(Tuple3<String, String,String&amp;gt; value1, Tuple3<String, > String,String&amp;gt; value2) throws Exception { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > value1.f1 + "=" + value2.f1; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &nbsp;&nbsp;&nbsp; }).print(); > &nbsp;&nbsp;&nbsp; 结果如下: > &nbsp;&nbsp;&nbsp; currentTimeStamp: > 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > &nbsp;&nbsp;&nbsp; 4&amp;gt; (1,tom1,1000000055000) > &nbsp;&nbsp;&nbsp; currentTimeStamp: > 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > &nbsp;&nbsp;&nbsp; 4&amp;gt; (1,tom2,1000000056000) > &nbsp;&nbsp;&nbsp; currentTimeStamp: > 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 > &nbsp;&nbsp;&nbsp; 3&amp;gt; (1,jerry1,1000000055000) > &nbsp;&nbsp;&nbsp; currentTimeStamp: > 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 > &nbsp;&nbsp;&nbsp; 3&amp;gt; (1,jerry2,1000000056000) > &nbsp;&nbsp;&nbsp; 2&amp;gt; tom1=jerry1 -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [email protected]; [email protected]
