public static long getWindowStartWithOffset(long timestamp, long offset, long
windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
在 2020/4/4 下午6:23,“忝忝向仧”<[email protected]> 写入:
对,我只是回复把前面那串省略了,没写.
[1000000053000,1000000056000),为什么是1000000053000开始?
我第一条输入的是
1,tom1,1000000055000
------------------ 原始邮件 ------------------
发件人: "libenchao"<[email protected]>;
发送时间: 2020年4月4日(星期六) 晚上6:20
收件人: "user-zh"<[email protected]>;
主题: Re: Flink双流Join问题
你的watermark不是56000,而是1000000056000吧。所以应该是[1000000053000,
1000000056000)是一个窗口吧。
忝忝向仧 <[email protected]> 于2020年4月4日周六 下午6:16写道:
> 下发新的?
>
>
> 4是一个kafka的source,3是另外一个kafka的source.
> 如果按照3秒的一个窗口
> watermark触发窗口的条件是watermark_time&gt;=window_endtime
>
>
> 也就是说[55000,57000)应该是一个窗口的.
>
>
> 我是这么理解的,但是结果56000后就输出了
>
>
>
>
currentTimeStamp:&nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> 4&gt; (1,tom1,1000000055000)
>
>
currentTimeStamp:&nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> 4&gt; (1,tom2,1000000056000)
>
>
currentTimeStamp:&nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> 3&gt; (1,jerry1,1000000055000)
>
>
currentTimeStamp:&nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> 3&gt; (1,jerry2,1000000056000)
> 2&gt; tom1=jerry1
>
>
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"lee.roval"<[email protected]&gt;;
> 发送时间:&nbsp;2020年4月4日(星期六) 晚上6:10
>
收件人:&nbsp;"[email protected]"<[email protected]&gt;;
>
> 主题:&nbsp;Re: Flink双流Join问题
>
>
>
> 56000后不是下发新的watermark了嘛
>
> 在 2020/4/4 下午5:57,“忝忝向仧”<[email protected]&gt; 写入:
>
> &nbsp;&nbsp;&nbsp; 各位好:&amp;nbsp; &amp;nbsp;
&amp;nbsp; &amp;nbsp;
> Flink双流Join遇到一个问题,能否解释下,谢谢.
> &nbsp;&nbsp;&nbsp; &amp;nbsp; &amp;nbsp;
&amp;nbsp; &amp;nbsp;
> ds1和ds2分别读取kafka两个流数据,使用event time和watermark特性,3s的一个翻滚窗口,定义如下:
> &nbsp;&nbsp;&nbsp; &amp;nbsp; &amp;nbsp;
&amp;nbsp; &amp;nbsp;
> 最后,join输出的时候,为什么触发窗口的数据第二条就触发了?
> &nbsp;&nbsp;&nbsp; &amp;nbsp; &amp;nbsp;
&amp;nbsp; &amp;nbsp;
>
按照水印的触发条件应该是watermark_time&amp;gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么?
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp; 定义的代码如下:
> &nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream1 =
env
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .addSource(new FlinkKafkaConsumer09<String&amp;gt;("stream1", new
> SimpleStringSchema(), properties).setStartFromLatest())
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .assignTimestampsAndWatermarks(
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> new AssignerWithPeriodicWatermarks<String&amp;gt;() {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long currentTimeStamp = 0L;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long maxDelayAllowed = 0L;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long currentWaterMark;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> @Nullable
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> @Override
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> public Watermark getCurrentWatermark() {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> currentWaterMark = currentTimeStamp-maxDelayAllowed;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return new Watermark(currentWaterMark);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> @Override
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> public long extractTimestamp(String s, long l) {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> String[] arr= s.split(" ");
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long timeStamp = Long.parseLong(arr[2]);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> System.out.println("currentTimeStamp: " +&nbsp; currentTimeStamp
+",Key:" +
> arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return timeStamp;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
);
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
DataStream<Tuple3<String,String,String&amp;gt;&amp;gt;
> ds1 = stream1.map(new MapFunction<String,
> Tuple3<String,String,String&amp;gt;&amp;gt;() {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
@Override
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
public Tuple3<String,
> String,String&amp;gt; map(String s1) throws Exception {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> String[] arr = s1.split(" ");
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
return
> Tuple3.of(arr[0],arr[1],arr[2]);
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
}
> &nbsp;&nbsp;&nbsp; });
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp; ds1.print();
> &nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream2 =
env
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .addSource(new FlinkKafkaConsumer09<String&amp;gt;("stream2", new
> SimpleStringSchema(), properties).setStartFromLatest())
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .assignTimestampsAndWatermarks(
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> new AssignerWithPeriodicWatermarks<String&amp;gt;() {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long currentTimeStamp = 0L;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long maxDelayAllowed = 0L;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long currentWaterMark;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> @Nullable
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> @Override
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> public Watermark getCurrentWatermark() {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> currentWaterMark = currentTimeStamp-maxDelayAllowed;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return new Watermark(currentWaterMark);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> @Override
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> public long extractTimestamp(String s, long l) {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> String[] arr= s.split(" ");
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> long timeStamp = Long.parseLong(arr[2]);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> System.out.println("currentTimeStamp: " +&nbsp; currentTimeStamp
+",Key:" +
> arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return timeStamp;
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> }
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
);
> &nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;
DataStream<Tuple3<String,String,String&amp;gt;&amp;gt;
> ds2 =stream2.map(new MapFunction<String,
> Tuple3<String,String,String&amp;gt;&amp;gt;() {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
@Override
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
public
> Tuple3<String,String,String&amp;gt; map(String s2) throws
Exception {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
String
> [] arr = s2.split(" ");
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
return
> Tuple3.of(arr[0],arr[1],arr[2]);
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
}
> &nbsp;&nbsp;&nbsp; });
> &nbsp;&nbsp;&nbsp; ds2.print();
> &nbsp;&nbsp;&nbsp; ds1.join(ds2).where(new
KeySelector<Tuple3<String,
> String,String&amp;gt;,String&amp;gt;() {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
@Override
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
public String
> getKey(Tuple3<String, String,String&amp;gt; value) throws
Exception {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
return
> value.f0;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
}
> &nbsp;&nbsp;&nbsp; }).equalTo(new
KeySelector<Tuple3<String, String
> ,String&amp;gt;, String&amp;gt;() {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
@Override
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
public String
> getKey(Tuple3<String, String,String&amp;gt; value) throws
Exception {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
return
> value.f0;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
}
> &nbsp;&nbsp;&nbsp; })
> &nbsp;&nbsp;&nbsp;
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
> &nbsp;&nbsp;&nbsp; .apply(new JoinFunction<Tuple3<String,
> String,String&amp;gt;, Tuple3<String, String,String&amp;gt;,
> String&amp;gt;() {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
@Override
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
public String
> join(Tuple3<String, String,String&amp;gt; value1, Tuple3<String,
> String,String&amp;gt; value2) throws Exception {
>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
return
> value1.f1 + "=" + value2.f1;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
}
> &nbsp;&nbsp;&nbsp; }).print();
> &nbsp;&nbsp;&nbsp; 结果如下:
> &nbsp;&nbsp;&nbsp; currentTimeStamp:
> 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> &nbsp;&nbsp;&nbsp; 4&amp;gt; (1,tom1,1000000055000)
> &nbsp;&nbsp;&nbsp; currentTimeStamp:
> 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> &nbsp;&nbsp;&nbsp; 4&amp;gt; (1,tom2,1000000056000)
> &nbsp;&nbsp;&nbsp; currentTimeStamp:
> 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
> &nbsp;&nbsp;&nbsp; 3&amp;gt; (1,jerry1,1000000055000)
> &nbsp;&nbsp;&nbsp; currentTimeStamp:
> 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
> &nbsp;&nbsp;&nbsp; 3&amp;gt; (1,jerry2,1000000056000)
> &nbsp;&nbsp;&nbsp; 2&amp;gt; tom1=jerry1
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]