Flink 1.10,windows 10 flink api验证
代码如下
```
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
import java.util.List;
public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setParallelism(3);
Tuple2<String, Integer> item = null;
List<Tuple2<String, Integer>> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 10);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 11);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource<Tuple2<String, Integer>> streamSource =
env.fromCollection(items);
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws
Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++++++++++++++++++++++++++");
env.execute("keyedSteamJob");
}
}
```
输出
```
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,1)
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:2> (k1,21)
```
如果把
window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集
为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别
如果k1=1已经在ValueState中(2>(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;
如果window改成1秒也是按照正常结果输出
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用