Thanks!

Now I can call myself a super flink developer :)

As for the issue - I am still trying to figure out ways to do that. I've
raised a question in this thread:

http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAJ746X69L%2ByARu3pq74peov1TxyfUPhtQWg3ffLJ5SQk4OmTAg%40mail.gmail.com%3E

Thanks for your help!


On Mon, Apr 25, 2016 at 9:26 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Yes, this looks correct for a Counting Trigger that flushes when the
> sources finish. Could you also solve your filtering problem with this or is
> this still an open issue?
>
> Cheers,
> Aljoscha
>
> On Sat, 23 Apr 2016 at 16:57 Konstantin Kulagin <kkula...@gmail.com>
> wrote:
>
>> I finally was able to do that. Kinda ugly, but works:
>>
>> https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5
>>
>>
>>
>> On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <kkula...@gmail.com>
>> wrote:
>>
>>> I was trying to implement this (force flink to handle all values from
>>> input) but had no success...
>>> Probably I am not getting smth with flink windowing mechanism
>>> I've created my 'finishing' trigger which is basically a copy of purging
>>> trigger
>>>
>>> But was not able to make it work:
>>>
>>> https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308
>>>
>>> I was never able to see numbers from 30 to 34 in result.
>>> What am I doing wrong?
>>>
>>>
>>> On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> People have wondered about that a few times, yes. My opinion is that a
>>>> stream is potentially infinite and processing only stops for anomalous
>>>> reasons: when the job crashes, when stopping a job to later redeploy it. In
>>>> those cases you would not want to flush out your data but keep them and
>>>> restart from the same state when the job is restarted.
>>>>
>>>> You can implement the behavior by writing a custom Trigger that behaves
>>>> like the count trigger but also fires when receiving a Long.MAX_VALUE
>>>> watermark. A watermark of Long.MAX_VALUE signifies that a source has
>>>> stopped processing for natural reasons.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote:
>>>>
>>>>> Thanks,
>>>>>
>>>>> I wonder wouldn't it be good to have a built-in such functionality. At
>>>>> least when incoming stream is finished - flush remaining elements.
>>>>>
>>>>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>> yes, you can achieve this by writing a custom Trigger that can
>>>>>> trigger both on the count or after a long-enough timeout. It would be a
>>>>>> combination of CountTrigger and EventTimeTrigger (or 
>>>>>> ProcessingTimeTrigger)
>>>>>> so you could look to those to get started.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a pretty big but final stream and I need to be able to window
>>>>>>> it by number of elements.
>>>>>>> In this case from my observations flink can 'skip' the latest chunk
>>>>>>> of data if it has lower amount of elements than window size:
>>>>>>>
>>>>>>>     StreamExecutionEnvironment env = 
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>     DataStreamSource<Long> source = env.addSource(new 
>>>>>>> SourceFunction<Long>() {
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>>>>       }
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void cancel() {
>>>>>>>
>>>>>>>       }
>>>>>>>     });
>>>>>>>
>>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
>>>>>>> GlobalWindow>() {
>>>>>>>       @Override
>>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, 
>>>>>>> Collector<Long> out) throws Exception {
>>>>>>>         System.out.println(Joiner.on(',').join(values));
>>>>>>>       }
>>>>>>>     }).print();
>>>>>>>
>>>>>>>     env.execute("yoyoyo");
>>>>>>>
>>>>>>>
>>>>>>> Output:
>>>>>>> 0,1,2,3,4,5,6,7,8,9
>>>>>>> 10,11,12,13,14,15,16,17,18,19
>>>>>>> 20,21,22,23,24,25,26,27,28,29
>>>>>>>
>>>>>>> I.e. elements from 10 to 35 are not being processed.
>>>>>>>
>>>>>>> Does it make sense to have: count OR timeout window which will evict
>>>>>>> new window when number of elements reach a threshold OR collecting 
>>>>>>> timeout
>>>>>>> occurs?
>>>>>>>
>>>>>>
>>>>>
>>>
>>

Reply via email to