I have a pretty big but final stream and I need to be able to window it by number of elements. In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() { @Override public void run(SourceContext<Long> ctx) throws Exception { LongStream.range(0, 35).forEach(ctx::collect); } @Override public void cancel() { } }); source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception { System.out.println(Joiner.on(',').join(values)); } }).print(); env.execute("yoyoyo"); Output: 0,1,2,3,4,5,6,7,8,9 10,11,12,13,14,15,16,17,18,19 20,21,22,23,24,25,26,27,28,29 I.e. elements from 10 to 35 are not being processed. Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?