I have a pretty big but final stream and I need to be able to window it by
number of elements.
In this case from my observations flink can 'skip' the latest chunk of data
if it has lower amount of elements than window size:

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        LongStream.range(0, 35).forEach(ctx::collect);
      }

      @Override
      public void cancel() {

      }
    });

    source.countWindowAll(10).apply(new AllWindowFunction<Long, Long,
GlobalWindow>() {
      @Override
      public void apply(GlobalWindow window, Iterable<Long> values,
Collector<Long> out) throws Exception {
        System.out.println(Joiner.on(',').join(values));
      }
    }).print();

    env.execute("yoyoyo");


Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new
window when number of elements reach a threshold OR collecting timeout
occurs?

Reply via email to