Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At
least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> yes, you can achieve this by writing a custom Trigger that can trigger
> both on the count or after a long-enough timeout. It would be a combination
> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
> could look to those to get started.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote:
>
>> I have a pretty big but final stream and I need to be able to window it
>> by number of elements.
>> In this case from my observations flink can 'skip' the latest chunk of
>> data if it has lower amount of elements than window size:
>>
>>     StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() 
>> {
>>
>>       @Override
>>       public void run(SourceContext<Long> ctx) throws Exception {
>>         LongStream.range(0, 35).forEach(ctx::collect);
>>       }
>>
>>       @Override
>>       public void cancel() {
>>
>>       }
>>     });
>>
>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
>> GlobalWindow>() {
>>       @Override
>>       public void apply(GlobalWindow window, Iterable<Long> values, 
>> Collector<Long> out) throws Exception {
>>         System.out.println(Joiner.on(',').join(values));
>>       }
>>     }).print();
>>
>>     env.execute("yoyoyo");
>>
>>
>> Output:
>> 0,1,2,3,4,5,6,7,8,9
>> 10,11,12,13,14,15,16,17,18,19
>> 20,21,22,23,24,25,26,27,28,29
>>
>> I.e. elements from 10 to 35 are not being processed.
>>
>> Does it make sense to have: count OR timeout window which will evict new
>> window when number of elements reach a threshold OR collecting timeout
>> occurs?
>>
>

Reply via email to