Thanks, I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.
On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > yes, you can achieve this by writing a custom Trigger that can trigger > both on the count or after a long-enough timeout. It would be a combination > of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you > could look to those to get started. > > Cheers, > Aljoscha > > On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote: > >> I have a pretty big but final stream and I need to be able to window it >> by number of elements. >> In this case from my observations flink can 'skip' the latest chunk of >> data if it has lower amount of elements than window size: >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() >> { >> >> @Override >> public void run(SourceContext<Long> ctx) throws Exception { >> LongStream.range(0, 35).forEach(ctx::collect); >> } >> >> @Override >> public void cancel() { >> >> } >> }); >> >> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >> GlobalWindow>() { >> @Override >> public void apply(GlobalWindow window, Iterable<Long> values, >> Collector<Long> out) throws Exception { >> System.out.println(Joiner.on(',').join(values)); >> } >> }).print(); >> >> env.execute("yoyoyo"); >> >> >> Output: >> 0,1,2,3,4,5,6,7,8,9 >> 10,11,12,13,14,15,16,17,18,19 >> 20,21,22,23,24,25,26,27,28,29 >> >> I.e. elements from 10 to 35 are not being processed. >> >> Does it make sense to have: count OR timeout window which will evict new >> window when number of elements reach a threshold OR collecting timeout >> occurs? >> >