People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.
You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons. Cheers, Aljoscha On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote: > Thanks, > > I wonder wouldn't it be good to have a built-in such functionality. At > least when incoming stream is finished - flush remaining elements. > > On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> yes, you can achieve this by writing a custom Trigger that can trigger >> both on the count or after a long-enough timeout. It would be a combination >> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you >> could look to those to get started. >> >> Cheers, >> Aljoscha >> >> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote: >> >>> I have a pretty big but final stream and I need to be able to window it >>> by number of elements. >>> In this case from my observations flink can 'skip' the latest chunk of >>> data if it has lower amount of elements than window size: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStreamSource<Long> source = env.addSource(new >>> SourceFunction<Long>() { >>> >>> @Override >>> public void run(SourceContext<Long> ctx) throws Exception { >>> LongStream.range(0, 35).forEach(ctx::collect); >>> } >>> >>> @Override >>> public void cancel() { >>> >>> } >>> }); >>> >>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>> GlobalWindow>() { >>> @Override >>> public void apply(GlobalWindow window, Iterable<Long> values, >>> Collector<Long> out) throws Exception { >>> System.out.println(Joiner.on(',').join(values)); >>> } >>> }).print(); >>> >>> env.execute("yoyoyo"); >>> >>> >>> Output: >>> 0,1,2,3,4,5,6,7,8,9 >>> 10,11,12,13,14,15,16,17,18,19 >>> 20,21,22,23,24,25,26,27,28,29 >>> >>> I.e. elements from 10 to 35 are not being processed. >>> >>> Does it make sense to have: count OR timeout window which will evict new >>> window when number of elements reach a threshold OR collecting timeout >>> occurs? >>> >> >