Thanks! Now I can call myself a super flink developer :)
As for the issue - I am still trying to figure out ways to do that. I've raised a question in this thread: http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAJ746X69L%2ByARu3pq74peov1TxyfUPhtQWg3ffLJ5SQk4OmTAg%40mail.gmail.com%3E Thanks for your help! On Mon, Apr 25, 2016 at 9:26 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Yes, this looks correct for a Counting Trigger that flushes when the > sources finish. Could you also solve your filtering problem with this or is > this still an open issue? > > Cheers, > Aljoscha > > On Sat, 23 Apr 2016 at 16:57 Konstantin Kulagin <kkula...@gmail.com> > wrote: > >> I finally was able to do that. Kinda ugly, but works: >> >> https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5 >> >> >> >> On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <kkula...@gmail.com> >> wrote: >> >>> I was trying to implement this (force flink to handle all values from >>> input) but had no success... >>> Probably I am not getting smth with flink windowing mechanism >>> I've created my 'finishing' trigger which is basically a copy of purging >>> trigger >>> >>> But was not able to make it work: >>> >>> https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308 >>> >>> I was never able to see numbers from 30 to 34 in result. >>> What am I doing wrong? >>> >>> >>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> People have wondered about that a few times, yes. My opinion is that a >>>> stream is potentially infinite and processing only stops for anomalous >>>> reasons: when the job crashes, when stopping a job to later redeploy it. In >>>> those cases you would not want to flush out your data but keep them and >>>> restart from the same state when the job is restarted. >>>> >>>> You can implement the behavior by writing a custom Trigger that behaves >>>> like the count trigger but also fires when receiving a Long.MAX_VALUE >>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has >>>> stopped processing for natural reasons. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote: >>>> >>>>> Thanks, >>>>> >>>>> I wonder wouldn't it be good to have a built-in such functionality. At >>>>> least when incoming stream is finished - flush remaining elements. >>>>> >>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org >>>>> > wrote: >>>>> >>>>>> Hi, >>>>>> yes, you can achieve this by writing a custom Trigger that can >>>>>> trigger both on the count or after a long-enough timeout. It would be a >>>>>> combination of CountTrigger and EventTimeTrigger (or >>>>>> ProcessingTimeTrigger) >>>>>> so you could look to those to get started. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>> >>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I have a pretty big but final stream and I need to be able to window >>>>>>> it by number of elements. >>>>>>> In this case from my observations flink can 'skip' the latest chunk >>>>>>> of data if it has lower amount of elements than window size: >>>>>>> >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> DataStreamSource<Long> source = env.addSource(new >>>>>>> SourceFunction<Long>() { >>>>>>> >>>>>>> @Override >>>>>>> public void run(SourceContext<Long> ctx) throws Exception { >>>>>>> LongStream.range(0, 35).forEach(ctx::collect); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void cancel() { >>>>>>> >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>>>>>> GlobalWindow>() { >>>>>>> @Override >>>>>>> public void apply(GlobalWindow window, Iterable<Long> values, >>>>>>> Collector<Long> out) throws Exception { >>>>>>> System.out.println(Joiner.on(',').join(values)); >>>>>>> } >>>>>>> }).print(); >>>>>>> >>>>>>> env.execute("yoyoyo"); >>>>>>> >>>>>>> >>>>>>> Output: >>>>>>> 0,1,2,3,4,5,6,7,8,9 >>>>>>> 10,11,12,13,14,15,16,17,18,19 >>>>>>> 20,21,22,23,24,25,26,27,28,29 >>>>>>> >>>>>>> I.e. elements from 10 to 35 are not being processed. >>>>>>> >>>>>>> Does it make sense to have: count OR timeout window which will evict >>>>>>> new window when number of elements reach a threshold OR collecting >>>>>>> timeout >>>>>>> occurs? >>>>>>> >>>>>> >>>>> >>> >>