Re: Weird serialization bug?

2017-04-29 Thread Ted Yu
Have you tried making fooo static ?

Cheers

On Sat, Apr 29, 2017 at 4:26 AM, Sebastian Neef <
gehax...@mailbox.tu-berlin.de> wrote:

> Hello Apache Flink users,
>
> I implemented a FilterFunction some months ago that worked quite well
> back then. However, I wanted to check it out right now and it somehow
> broke in the sense that Flink can't serialize it anymore. I might be
> mistaken, but afaik I didn't touch the code at all.
>
> I think that I've tracked down the problem to the following minimal
> working PoC:
>
> - A simple interface:
>
> > interface testFunc extends Serializable {
> > boolean filter();
> > }
>
> - A TestFilterFunction which is applied on a DataSet:
>
> >  public void doSomeFiltering() {
> > class fooo implements testFunc {
> > public boolean filter() {
> > return false;
> > }
> > }
> >
> > class TestFilterFunction implements FilterFunction {
> >
> > testFunc filter;
> >
> > class fooo2 implements testFunc {
> > public boolean filter() {
> > return false;
> > }
> > }
> >
> > TestFilterFunction() {
> > // WORKS with fooo2()
> > // DOES NOT WORK with fooo()
> > this.filter = new fooo2();
> > }
> > @Override
> > public boolean filter(IPage iPage) throws Exception {
> > return filter.filter();
> > }
> > }
> >   filteredDataSet = DataSet.filter(new TestFilterFunction(null))> }
>
> Flink will work fine when the "fooo2" class is used. However, when using
> the "fooo()" class, I get the following error:
>
> > 
> >  The program finished with the following exception:
> >
> > The implementation of the FilterFunction is not serializable. The object
> probably contains or references non serializable fields.
> >   org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
> >   org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
> >   org.apache.flink.api.java.DataSet.filter(DataSet.java:287)
> >   testpackage.testclass.applyFilters(testclass.java:105)
>
> I'm a little bit confused, why Flink manages to serialize the "fooo2"
> class, but not the "fooo" class. Is this is a bug or do I miss something
> here?
>
> Cheers,
> Sebastian
>
>


Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Kostas Kloudas
Hi Moiz,

Here are the instructions on how to build Flink from source:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html 


Kostas

> On Apr 29, 2017, at 7:15 PM, Moiz S Jinia  wrote:
> 
> I meant maven dependencies that i can use by generating them from sources.
> 
> On Sat, Apr 29, 2017 at 10:31 PM, Moiz S Jinia  > wrote:
> Ok I'll try that. Its just that I'd rather use a stable version.
> Are there any instructions for building binaries from latest sources?
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 10:09 PM, Kostas Kloudas  > wrote:
> Hi Moiz,
> 
> The skip-till-next is a big change and backporting it does not seem feasible. 
> Also this would require more general changes to the 1.2 to make it compatible 
> with the previous 1.2 versions.
> 
> If you want you can already use the 1.3 version by downloading the master 
> branch and writing your 
> use-case against that. The changes until the final release are going to be 
> minor hopefully and we can
> always help you adjust your program accordingly.
> 
> Hope this helps,
> Kostas
> 
>> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia > > wrote:
>> 
>> Oh ok thats a bit far off. Is there any chance of a backport of 
>> https://issues.apache.org/jira/browse/FLINK-6208 
>>  to the 1.2 branch? I 
>> require the SKIP_TILL_NEXT behaviour for a production use case that we want 
>> to use Flink for.
>> 
>> Moiz
>> 
>> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas > > wrote:
>> The 1.3 is scheduled for the beginning of June.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia >> > wrote:
>>> 
>>> Thanks Dawid! 
>>> Yes thats what i was expecting. I'll give it a try.
>>> 
>>> When do you expect 1.3.0 stable to be out?
>>> 
>>> Moiz
>>> 
>>> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz 
>>> > wrote:
>>> Hi,
>>> 
>>> This is an expected behaviour. After the "ar" event there still may occur 
>>> other "ar" event that will also trigger a match.
>>> To be more generic in all versions prior to 1.3.0 there are two different 
>>> consuming strategies:
>>> STRICT (the next operator) - that accepts only if the event occurs directly 
>>> after the previous 
>>> SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
>>> following event if there were already an event that matched this pattern
>>> Because after "ni" event we could match with some other "ar" events, the 
>>> match is timeouted after 5 seconds.
>>> 
>>> In FLINK-6208  we 
>>> introduced third consuming strategy:
>>> SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
>>> does not have to occur directly after the previous one but only one event 
>>> can be matched
>>> and you can still use SKIP TILL ANY by using followedByAny. I believe the 
>>> SKIP TILL NEXT strategy is the one you expected. 
>>> You can check it on master branch. We did introduce lots of new features 
>>> and bugfixes to CEP for 1.3.0 version so any comments,
>>> tests or suggestions are welcome.
>>> 
>>> 
>>> Z pozdrowieniami! / Cheers!
>>> 
>>> Dawid Wysakowicz
>>> Data/Software Engineer
>>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>>  
>>> 
>>> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia >> >:
>>> When using "next", this pattern works fine for the both a match as well as 
>>> a timeout:
>>> 
>>> Pattern pattern = Pattern.begin("start")
>>> .where(evt -> evt.value.equals("ni"))
>>> .next("last").where(evt -> 
>>> evt.value.equals("ar")).within(Time.seconds(5));
>>> 
>>> 1. "ni" then "ar" within 5 seconds - triggers match
>>> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>>> 
>>> But with "followedBy", this does not behave as expected:
>>> 
>>> Pattern pattern = Pattern.begin("start")
>>> .where(evt -> evt.value.equals("ni"))
>>> .followedBy("last").where(evt -> 
>>> evt.value.equals("ar")).within(Time.seconds(5));
>>> 
>>> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
>>> 
>>> Why is the timeout triggered when using followedBy (when there is a match)?
>>> 
>>> Version - 1.1.5.
>>> 
>>> 
>> 
>> 
> 
> 
> 



Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Moiz S Jinia
I meant maven dependencies that i can use by generating them from sources.

On Sat, Apr 29, 2017 at 10:31 PM, Moiz S Jinia  wrote:

> Ok I'll try that. Its just that I'd rather use a stable version.
> Are there any instructions for building binaries from latest sources?
>
> Moiz
>
> On Sat, Apr 29, 2017 at 10:09 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Moiz,
>>
>> The skip-till-next is a big change and backporting it does not seem
>> feasible.
>> Also this would require more general changes to the 1.2 to make it
>> compatible with the previous 1.2 versions.
>>
>> If you want you can already use the 1.3 version by downloading the master
>> branch and writing your
>> use-case against that. The changes until the final release are going to
>> be minor hopefully and we can
>> always help you adjust your program accordingly.
>>
>> Hope this helps,
>> Kostas
>>
>> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia  wrote:
>>
>> Oh ok thats a bit far off. Is there any chance of a backport of
>> https://issues.apache.org/jira/browse/FLINK-6208 to the 1.2 branch? I
>> require the SKIP_TILL_NEXT behaviour for a production use case that we want
>> to use Flink for.
>>
>> Moiz
>>
>> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> The 1.3 is scheduled for the beginning of June.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia  wrote:
>>>
>>> Thanks Dawid!
>>> Yes thats what i was expecting. I'll give it a try.
>>>
>>> When do you expect 1.3.0 stable to be out?
>>>
>>> Moiz
>>>
>>> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz <
>>> wysakowicz.da...@gmail.com> wrote:
>>>
 Hi,

 This is an expected behaviour. After the "ar" event there still may
 occur other "ar" event that will also trigger a match.
 To be more generic in all versions prior to 1.3.0 there are two
 different consuming strategies:

- STRICT (the next operator) - that accepts only if the event
occurs directly after the previous
- SKIP TILL ANY (the followedBy operator) - it accepts any matching
event following event if there were already an event that matched this
pattern

 Because after "ni" event we could match with some other "ar" events,
 the match is timeouted after 5 seconds.

 In FLINK-6208  we
 introduced third consuming strategy:

- SKIP TILL NEXT(this is the strategy for followedBy right now) -
the event does not have to occur directly after the previous one but 
 only
one event can be matched

 and you can still use SKIP TILL ANY by using followedByAny. I believe
 the SKIP TILL NEXT strategy is the one you expected.
 You can check it on master branch. We did introduce lots of new
 features and bugfixes to CEP for 1.3.0 version so any comments,
 tests or suggestions are welcome.


 Z pozdrowieniami! / Cheers!

 Dawid Wysakowicz
 *Data/Software Engineer*
 Skype: dawid_wys | Twitter: @OneMoreCoder
 

 2017-04-29 12:14 GMT+02:00 Moiz S Jinia :

> When using "next", this pattern works fine for the both a match as
> well as a timeout:
>
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .next("last").where(evt -> evt.value.equals("ar")).within
> (Time.seconds(5));
>
> 1. "ni" then "ar" within 5 seconds - triggers match
> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>
> But with "followedBy", this does not behave as expected:
>
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .followedBy("last").where(evt -> evt.value.equals("ar")).within
> (Time.seconds(5));
>
> "ni" then "ar" within 5 seconds - triggers match and also triggers
> timeout.
>
> Why is the timeout triggered when using followedBy (when there is a
> match)?
>
> Version - 1.1.5.
>


>>>
>>>
>>
>>
>


Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Moiz S Jinia
Ok I'll try that. Its just that I'd rather use a stable version.
Are there any instructions for building binaries from latest sources?

Moiz

On Sat, Apr 29, 2017 at 10:09 PM, Kostas Kloudas <
k.klou...@data-artisans.com> wrote:

> Hi Moiz,
>
> The skip-till-next is a big change and backporting it does not seem
> feasible.
> Also this would require more general changes to the 1.2 to make it
> compatible with the previous 1.2 versions.
>
> If you want you can already use the 1.3 version by downloading the master
> branch and writing your
> use-case against that. The changes until the final release are going to be
> minor hopefully and we can
> always help you adjust your program accordingly.
>
> Hope this helps,
> Kostas
>
> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia  wrote:
>
> Oh ok thats a bit far off. Is there any chance of a backport of
> https://issues.apache.org/jira/browse/FLINK-6208 to the 1.2 branch? I
> require the SKIP_TILL_NEXT behaviour for a production use case that we want
> to use Flink for.
>
> Moiz
>
> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> The 1.3 is scheduled for the beginning of June.
>>
>> Cheers,
>> Kostas
>>
>> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia  wrote:
>>
>> Thanks Dawid!
>> Yes thats what i was expecting. I'll give it a try.
>>
>> When do you expect 1.3.0 stable to be out?
>>
>> Moiz
>>
>> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz <
>> wysakowicz.da...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This is an expected behaviour. After the "ar" event there still may
>>> occur other "ar" event that will also trigger a match.
>>> To be more generic in all versions prior to 1.3.0 there are two
>>> different consuming strategies:
>>>
>>>- STRICT (the next operator) - that accepts only if the event occurs
>>>directly after the previous
>>>- SKIP TILL ANY (the followedBy operator) - it accepts any matching
>>>event following event if there were already an event that matched this
>>>pattern
>>>
>>> Because after "ni" event we could match with some other "ar" events, the
>>> match is timeouted after 5 seconds.
>>>
>>> In FLINK-6208  we
>>> introduced third consuming strategy:
>>>
>>>- SKIP TILL NEXT(this is the strategy for followedBy right now) -
>>>the event does not have to occur directly after the previous one but only
>>>one event can be matched
>>>
>>> and you can still use SKIP TILL ANY by using followedByAny. I believe
>>> the SKIP TILL NEXT strategy is the one you expected.
>>> You can check it on master branch. We did introduce lots of new features
>>> and bugfixes to CEP for 1.3.0 version so any comments,
>>> tests or suggestions are welcome.
>>>
>>>
>>> Z pozdrowieniami! / Cheers!
>>>
>>> Dawid Wysakowicz
>>> *Data/Software Engineer*
>>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>> 
>>>
>>> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia :
>>>
 When using "next", this pattern works fine for the both a match as well
 as a timeout:

 Pattern pattern = Pattern.begin("start")
 .where(evt -> evt.value.equals("ni"))
 .next("last").where(evt -> evt.value.equals("ar")).within
 (Time.seconds(5));

 1. "ni" then "ar" within 5 seconds - triggers match
 2. "ni" then no "ar" within 5 seconds - triggers timeout

 But with "followedBy", this does not behave as expected:

 Pattern pattern = Pattern.begin("start")
 .where(evt -> evt.value.equals("ni"))
 .followedBy("last").where(evt -> evt.value.equals("ar")).within
 (Time.seconds(5));

 "ni" then "ar" within 5 seconds - triggers match and also triggers
 timeout.

 Why is the timeout triggered when using followedBy (when there is a
 match)?

 Version - 1.1.5.

>>>
>>>
>>
>>
>
>


Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Kostas Kloudas
Hi Moiz,

The skip-till-next is a big change and backporting it does not seem feasible. 
Also this would require more general changes to the 1.2 to make it compatible 
with the previous 1.2 versions.

If you want you can already use the 1.3 version by downloading the master 
branch and writing your 
use-case against that. The changes until the final release are going to be 
minor hopefully and we can
always help you adjust your program accordingly.

Hope this helps,
Kostas

> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia  wrote:
> 
> Oh ok thats a bit far off. Is there any chance of a backport of 
> https://issues.apache.org/jira/browse/FLINK-6208 
>  to the 1.2 branch? I 
> require the SKIP_TILL_NEXT behaviour for a production use case that we want 
> to use Flink for.
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas  > wrote:
> The 1.3 is scheduled for the beginning of June.
> 
> Cheers,
> Kostas
> 
>> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia > > wrote:
>> 
>> Thanks Dawid! 
>> Yes thats what i was expecting. I'll give it a try.
>> 
>> When do you expect 1.3.0 stable to be out?
>> 
>> Moiz
>> 
>> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz 
>> > wrote:
>> Hi,
>> 
>> This is an expected behaviour. After the "ar" event there still may occur 
>> other "ar" event that will also trigger a match.
>> To be more generic in all versions prior to 1.3.0 there are two different 
>> consuming strategies:
>> STRICT (the next operator) - that accepts only if the event occurs directly 
>> after the previous 
>> SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
>> following event if there were already an event that matched this pattern
>> Because after "ni" event we could match with some other "ar" events, the 
>> match is timeouted after 5 seconds.
>> 
>> In FLINK-6208  we 
>> introduced third consuming strategy:
>> SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
>> does not have to occur directly after the previous one but only one event 
>> can be matched
>> and you can still use SKIP TILL ANY by using followedByAny. I believe the 
>> SKIP TILL NEXT strategy is the one you expected. 
>> You can check it on master branch. We did introduce lots of new features and 
>> bugfixes to CEP for 1.3.0 version so any comments,
>> tests or suggestions are welcome.
>> 
>> 
>> Z pozdrowieniami! / Cheers!
>> 
>> Dawid Wysakowicz
>> Data/Software Engineer
>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>  
>> 
>> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia > >:
>> When using "next", this pattern works fine for the both a match as well as a 
>> timeout:
>> 
>> Pattern pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .next("last").where(evt -> 
>> evt.value.equals("ar")).within(Time.seconds(5));
>> 
>> 1. "ni" then "ar" within 5 seconds - triggers match
>> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>> 
>> But with "followedBy", this does not behave as expected:
>> 
>> Pattern pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .followedBy("last").where(evt -> 
>> evt.value.equals("ar")).within(Time.seconds(5));
>> 
>> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
>> 
>> Why is the timeout triggered when using followedBy (when there is a match)?
>> 
>> Version - 1.1.5.
>> 
>> 
> 
> 



Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Moiz S Jinia
Oh ok thats a bit far off. Is there any chance of a backport of
https://issues.apache.org/jira/browse/FLINK-6208 to the 1.2 branch? I
require the SKIP_TILL_NEXT behaviour for a production use case that we want
to use Flink for.

Moiz

On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas  wrote:

> The 1.3 is scheduled for the beginning of June.
>
> Cheers,
> Kostas
>
> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia  wrote:
>
> Thanks Dawid!
> Yes thats what i was expecting. I'll give it a try.
>
> When do you expect 1.3.0 stable to be out?
>
> Moiz
>
> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
>
>> Hi,
>>
>> This is an expected behaviour. After the "ar" event there still may occur
>> other "ar" event that will also trigger a match.
>> To be more generic in all versions prior to 1.3.0 there are two different
>> consuming strategies:
>>
>>- STRICT (the next operator) - that accepts only if the event occurs
>>directly after the previous
>>- SKIP TILL ANY (the followedBy operator) - it accepts any matching
>>event following event if there were already an event that matched this
>>pattern
>>
>> Because after "ni" event we could match with some other "ar" events, the
>> match is timeouted after 5 seconds.
>>
>> In FLINK-6208  we
>> introduced third consuming strategy:
>>
>>- SKIP TILL NEXT(this is the strategy for followedBy right now) - the
>>event does not have to occur directly after the previous one but only one
>>event can be matched
>>
>> and you can still use SKIP TILL ANY by using followedByAny. I believe the
>> SKIP TILL NEXT strategy is the one you expected.
>> You can check it on master branch. We did introduce lots of new features
>> and bugfixes to CEP for 1.3.0 version so any comments,
>> tests or suggestions are welcome.
>>
>>
>> Z pozdrowieniami! / Cheers!
>>
>> Dawid Wysakowicz
>> *Data/Software Engineer*
>> Skype: dawid_wys | Twitter: @OneMoreCoder
>> 
>>
>> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia :
>>
>>> When using "next", this pattern works fine for the both a match as well
>>> as a timeout:
>>>
>>> Pattern pattern = Pattern.begin("start")
>>> .where(evt -> evt.value.equals("ni"))
>>> .next("last").where(evt -> evt.value.equals("ar")).within
>>> (Time.seconds(5));
>>>
>>> 1. "ni" then "ar" within 5 seconds - triggers match
>>> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>>>
>>> But with "followedBy", this does not behave as expected:
>>>
>>> Pattern pattern = Pattern.begin("start")
>>> .where(evt -> evt.value.equals("ni"))
>>> .followedBy("last").where(evt -> evt.value.equals("ar")).within
>>> (Time.seconds(5));
>>>
>>> "ni" then "ar" within 5 seconds - triggers match and also triggers
>>> timeout.
>>>
>>> Why is the timeout triggered when using followedBy (when there is a
>>> match)?
>>>
>>> Version - 1.1.5.
>>>
>>
>>
>
>


Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Kostas Kloudas
The 1.3 is scheduled for the beginning of June.

Cheers,
Kostas

> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia  wrote:
> 
> Thanks Dawid! 
> Yes thats what i was expecting. I'll give it a try.
> 
> When do you expect 1.3.0 stable to be out?
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz  > wrote:
> Hi,
> 
> This is an expected behaviour. After the "ar" event there still may occur 
> other "ar" event that will also trigger a match.
> To be more generic in all versions prior to 1.3.0 there are two different 
> consuming strategies:
> STRICT (the next operator) - that accepts only if the event occurs directly 
> after the previous 
> SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
> following event if there were already an event that matched this pattern
> Because after "ni" event we could match with some other "ar" events, the 
> match is timeouted after 5 seconds.
> 
> In FLINK-6208  we 
> introduced third consuming strategy:
> SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
> does not have to occur directly after the previous one but only one event can 
> be matched
> and you can still use SKIP TILL ANY by using followedByAny. I believe the 
> SKIP TILL NEXT strategy is the one you expected. 
> You can check it on master branch. We did introduce lots of new features and 
> bugfixes to CEP for 1.3.0 version so any comments,
> tests or suggestions are welcome.
> 
> 
> Z pozdrowieniami! / Cheers!
> 
> Dawid Wysakowicz
> Data/Software Engineer
> Skype: dawid_wys | Twitter: @OneMoreCoder
>  
> 
> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia  >:
> When using "next", this pattern works fine for the both a match as well as a 
> timeout:
> 
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .next("last").where(evt -> 
> evt.value.equals("ar")).within(Time.seconds(5));
> 
> 1. "ni" then "ar" within 5 seconds - triggers match
> 2. "ni" then no "ar" within 5 seconds - triggers timeout
> 
> But with "followedBy", this does not behave as expected:
> 
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .followedBy("last").where(evt -> 
> evt.value.equals("ar")).within(Time.seconds(5));
> 
> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
> 
> Why is the timeout triggered when using followedBy (when there is a match)?
> 
> Version - 1.1.5.
> 
> 



Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Moiz S Jinia
Thanks Dawid!
Yes thats what i was expecting. I'll give it a try.

When do you expect 1.3.0 stable to be out?

Moiz

On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz <
wysakowicz.da...@gmail.com> wrote:

> Hi,
>
> This is an expected behaviour. After the "ar" event there still may occur
> other "ar" event that will also trigger a match.
> To be more generic in all versions prior to 1.3.0 there are two different
> consuming strategies:
>
>- STRICT (the next operator) - that accepts only if the event occurs
>directly after the previous
>- SKIP TILL ANY (the followedBy operator) - it accepts any matching
>event following event if there were already an event that matched this
>pattern
>
> Because after "ni" event we could match with some other "ar" events, the
> match is timeouted after 5 seconds.
>
> In FLINK-6208  we
> introduced third consuming strategy:
>
>- SKIP TILL NEXT(this is the strategy for followedBy right now) - the
>event does not have to occur directly after the previous one but only one
>event can be matched
>
> and you can still use SKIP TILL ANY by using followedByAny. I believe the
> SKIP TILL NEXT strategy is the one you expected.
> You can check it on master branch. We did introduce lots of new features
> and bugfixes to CEP for 1.3.0 version so any comments,
> tests or suggestions are welcome.
>
>
> Z pozdrowieniami! / Cheers!
>
> Dawid Wysakowicz
>
> *Data/Software Engineer*
>
> Skype: dawid_wys | Twitter: @OneMoreCoder
>
> 
>
> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia :
>
>> When using "next", this pattern works fine for the both a match as well
>> as a timeout:
>>
>> Pattern pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .next("last").where(evt -> evt.value.equals("ar")).within
>> (Time.seconds(5));
>>
>> 1. "ni" then "ar" within 5 seconds - triggers match
>> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>>
>> But with "followedBy", this does not behave as expected:
>>
>> Pattern pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .followedBy("last").where(evt -> evt.value.equals("ar")).within
>> (Time.seconds(5));
>>
>> "ni" then "ar" within 5 seconds - triggers match and also triggers
>> timeout.
>>
>> Why is the timeout triggered when using followedBy (when there is a
>> match)?
>>
>> Version - 1.1.5.
>>
>
>


Re: Periodic flush sink?

2017-04-29 Thread Niels Basjes
Thanks.

The specific table I have here is used to debugging purposes so at the
HBase level I set a TTL of the data of 12 hours.
So I'm not worrying about the Hfiles.
Doing a lot of 'small' calls has an impact on HBase as a whole (not just
this table) so I want buffering.
Having a buffer that can hold 1000 events and at times I create 10 events
with a single page and I'm the only on on the site (at that moment) the
events will be buffered for a much too long time.

I did a quick test and this seems to work for my case.
In what situations do you guys expect this code construct to fail? Any edge
cases I missed?

Niels

private transient BufferedMutator mutator = null;
private transient Timer timer = null;

@Override
public void open(Configuration parameters) throws Exception {
  org.apache.hadoop.conf.Configuration hbaseConfig =
HBaseConfiguration.create();
  Connection connection = ConnectionFactory.createConnection(hbaseConfig);

  mutator = connection.getBufferedMutator(
new BufferedMutatorParams(TableName.valueOf(tableName))
  .pool(getDefaultExecutor(hbaseConfig))
  .writeBufferSize(HBASE_BUFFER_SIZE)
  );

  timer = new Timer();
  timer.schedule(new TimerTask(){
@Override
public void run() {
  try {
MySink.this.mutator.flush();
  } catch (Exception e) {
// Ignore
  }
}}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
}

@Override
public void close() throws IOException {
  timer.cancel();
  mutator.close();
}





On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu  wrote:

> I expect Flink expert to answer your question.
>
> bq. I get a flush of the buffers atleast every few seconds
>
> From hbase point of view, during low traffic period, the above may result
> in many small hfiles, leading to more work for the compaction.
>
> FYI
>
> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes  wrote:
>
>> Hi,
>>
>> I have a sink that writes my records into HBase.
>>
>> The data stream is attached to measurements from an internal testing
>> instance of the website.
>> As a consequence there are periods of really high load (someone is doing
>> a load test) and really low load (only a hand full of people are testing
>> stuff).
>>
>> I read the records from Kafka and I want to write the records into HBase.
>> Because under high load it is more efficient to buffer the writes between
>> the client and the server and as indicated by HBase I use a BufferedMutator.
>>
>> This BufferedMutator works with a 'fixed size' buffer and under high load
>> setting it to a few MiB improves the performance writing to HBase greatly.
>> However under low load you have to wait until the buffer is full and that
>> can be a LONG time (hours) when the load is really low.
>>
>> I want to fire a periodic event into my sink to ensure I get a flush of
>> the buffers atleast every few seconds.
>>
>> Simply implement a standard Java  TimerTask and fire that using a Timer?
>> Or is there a better way of doing that in Flink?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Dawid Wysakowicz
Hi,

This is an expected behaviour. After the "ar" event there still may occur
other "ar" event that will also trigger a match.
To be more generic in all versions prior to 1.3.0 there are two different
consuming strategies:

   - STRICT (the next operator) - that accepts only if the event occurs
   directly after the previous
   - SKIP TILL ANY (the followedBy operator) - it accepts any matching
   event following event if there were already an event that matched this
   pattern

Because after "ni" event we could match with some other "ar" events, the
match is timeouted after 5 seconds.

In FLINK-6208  we
introduced third consuming strategy:

   - SKIP TILL NEXT(this is the strategy for followedBy right now) - the
   event does not have to occur directly after the previous one but only one
   event can be matched

and you can still use SKIP TILL ANY by using followedByAny. I believe the
SKIP TILL NEXT strategy is the one you expected.
You can check it on master branch. We did introduce lots of new features
and bugfixes to CEP for 1.3.0 version so any comments,
tests or suggestions are welcome.


Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder



2017-04-29 12:14 GMT+02:00 Moiz S Jinia :

> When using "next", this pattern works fine for the both a match as well as
> a timeout:
>
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .next("last").where(evt -> evt.value.equals("ar")).
> within(Time.seconds(5));
>
> 1. "ni" then "ar" within 5 seconds - triggers match
> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>
> But with "followedBy", this does not behave as expected:
>
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .followedBy("last").where(evt -> evt.value.equals("ar")).
> within(Time.seconds(5));
>
> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
>
> Why is the timeout triggered when using followedBy (when there is a match)?
>
> Version - 1.1.5.
>


Re: Periodic flush sink?

2017-04-29 Thread Ted Yu
I expect Flink expert to answer your question.

bq. I get a flush of the buffers atleast every few seconds

>From hbase point of view, during low traffic period, the above may result
in many small hfiles, leading to more work for the compaction.

FYI

On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes  wrote:

> Hi,
>
> I have a sink that writes my records into HBase.
>
> The data stream is attached to measurements from an internal testing
> instance of the website.
> As a consequence there are periods of really high load (someone is doing a
> load test) and really low load (only a hand full of people are testing
> stuff).
>
> I read the records from Kafka and I want to write the records into HBase.
> Because under high load it is more efficient to buffer the writes between
> the client and the server and as indicated by HBase I use a BufferedMutator.
>
> This BufferedMutator works with a 'fixed size' buffer and under high load
> setting it to a few MiB improves the performance writing to HBase greatly.
> However under low load you have to wait until the buffer is full and that
> can be a LONG time (hours) when the load is really low.
>
> I want to fire a periodic event into my sink to ensure I get a flush of
> the buffers atleast every few seconds.
>
> Simply implement a standard Java  TimerTask and fire that using a Timer?
> Or is there a better way of doing that in Flink?
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Disk I/O in Flink

2017-04-29 Thread Robert Schmidtke
Hey Martin,

I'm still on it. I have switched to analyzing the flink-runtime tests, as I
observe similar divergence there. I'm not sure how long it'll take, but if
I find something I'll make sure to let you all know :)

Robert

On Sat, Apr 29, 2017 at 3:12 PM, Martin Eden 
wrote:

> Hi Robert,
>
> Any updates on the below for the community?
>
> Thanks,
> M
>
> On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke 
> wrote:
>
>> Hi Ufuk, thanks for coming back to me on this.
>>
>> The records are 100 bytes in size, the benchmark being TeraSort, so that
>> should not be an issue. I have played around with the input size, and here
>> are my observations:
>>
>> 128 GiB input: 0 Spilling in Flink.
>> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
>> writes), and my instrumentation covers all of it.
>> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
>> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
>> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
>> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
>> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
>> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>>
>> So regardless of how well configured my system is and spilling is even
>> necessary, it seems that with larger spilling amounts, the way the data is
>> spilled changes (and I start missing larger and larger portions of I/O
>> until almost 100%).
>> Now since I have written the instrumentation myself, I cannot guarantee
>> that it is flawless and I might have missed something.
>> I'm currently looking into how the file channels are being accessed in
>> parallel by multiple threads, which I cover as well and my tests verify it,
>> but maybe there are special access patterns here.
>>
>> Robert
>>
>> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi  wrote:
>>
>>> Hey Robert,
>>>
>>> for batch that should cover the relevant spilling code. If the records
>>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>>> incoming records as well. But that should be covered by the
>>> FileChannel instrumentation as well?
>>>
>>> – Ufuk
>>>
>>>
>>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>>>  wrote:
>>> > Hi,
>>> >
>>> > I have already looked at the UnilateralSortMerger, concluding that all
>>> I/O
>>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which
>>> in
>>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>>> Are
>>> > there more interaction points between Flink and the underlying file
>>> system
>>> > that I might want to consider?
>>> >
>>> > Thanks!
>>> > Robert
>>> >
>>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young  wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> You probably want check out UnilateralSortMerger.java, this is the
>>> class
>>> >> which is responsible for external sort for flink. Here is a short
>>> >> description for how it works: there are totally 3 threads working
>>> together,
>>> >> one for reading, one for sorting partial data in memory, and the last
>>> one is
>>> >> responsible for spilling. Flink will first figure out how many memory
>>> it can
>>> >> use during the in-memory sort, and manage them as MemorySegments.
>>> Once these
>>> >> memory runs out, the sorting thread will take over these memory and
>>> do the
>>> >> in-memory sorting (For more details about in-memory sorting, you can
>>> see
>>> >> NormalizedKeySorter). After this, the spilling thread will write this
>>> sorted
>>> >> data to disk and make these memory available again for reading. This
>>> will
>>> >> repeated until all data has been processed.
>>> >> Normally, the data will be read twice (one from source, and one from
>>> disk)
>>> >> and write once, but if you spilled too much files, flink will first
>>> merge
>>> >> some all the files and make sure the last merge step will not exceed
>>> some
>>> >> limit (default 128). Hope this can help you.
>>> >>
>>> >> Best,
>>> >> Kurt
>>> >>
>>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>>> ro.schmid...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>>> know
>>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>>> have done
>>> >>> so far.
>>> >>>
>>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node
>>> cluster, each
>>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>>> of disk.
>>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>>> >>>
>>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>>> counters
>>> >>> again. Accumulated over the 

Periodic flush sink?

2017-04-29 Thread Niels Basjes
Hi,

I have a sink that writes my records into HBase.

The data stream is attached to measurements from an internal testing
instance of the website.
As a consequence there are periods of really high load (someone is doing a
load test) and really low load (only a hand full of people are testing
stuff).

I read the records from Kafka and I want to write the records into HBase.
Because under high load it is more efficient to buffer the writes between
the client and the server and as indicated by HBase I use a BufferedMutator.

This BufferedMutator works with a 'fixed size' buffer and under high load
setting it to a few MiB improves the performance writing to HBase greatly.
However under low load you have to wait until the buffer is full and that
can be a LONG time (hours) when the load is really low.

I want to fire a periodic event into my sink to ensure I get a flush of the
buffers atleast every few seconds.

Simply implement a standard Java  TimerTask and fire that using a Timer?
Or is there a better way of doing that in Flink?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Disk I/O in Flink

2017-04-29 Thread Martin Eden
Hi Robert,

Any updates on the below for the community?

Thanks,
M

On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke 
wrote:

> Hi Ufuk, thanks for coming back to me on this.
>
> The records are 100 bytes in size, the benchmark being TeraSort, so that
> should not be an issue. I have played around with the input size, and here
> are my observations:
>
> 128 GiB input: 0 Spilling in Flink.
> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
> writes), and my instrumentation covers all of it.
> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>
> So regardless of how well configured my system is and spilling is even
> necessary, it seems that with larger spilling amounts, the way the data is
> spilled changes (and I start missing larger and larger portions of I/O
> until almost 100%).
> Now since I have written the instrumentation myself, I cannot guarantee
> that it is flawless and I might have missed something.
> I'm currently looking into how the file channels are being accessed in
> parallel by multiple threads, which I cover as well and my tests verify it,
> but maybe there are special access patterns here.
>
> Robert
>
> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi  wrote:
>
>> Hey Robert,
>>
>> for batch that should cover the relevant spilling code. If the records
>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>> incoming records as well. But that should be covered by the
>> FileChannel instrumentation as well?
>>
>> – Ufuk
>>
>>
>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>>  wrote:
>> > Hi,
>> >
>> > I have already looked at the UnilateralSortMerger, concluding that all
>> I/O
>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>> Are
>> > there more interaction points between Flink and the underlying file
>> system
>> > that I might want to consider?
>> >
>> > Thanks!
>> > Robert
>> >
>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young  wrote:
>> >>
>> >> Hi,
>> >>
>> >> You probably want check out UnilateralSortMerger.java, this is the
>> class
>> >> which is responsible for external sort for flink. Here is a short
>> >> description for how it works: there are totally 3 threads working
>> together,
>> >> one for reading, one for sorting partial data in memory, and the last
>> one is
>> >> responsible for spilling. Flink will first figure out how many memory
>> it can
>> >> use during the in-memory sort, and manage them as MemorySegments. Once
>> these
>> >> memory runs out, the sorting thread will take over these memory and do
>> the
>> >> in-memory sorting (For more details about in-memory sorting, you can
>> see
>> >> NormalizedKeySorter). After this, the spilling thread will write this
>> sorted
>> >> data to disk and make these memory available again for reading. This
>> will
>> >> repeated until all data has been processed.
>> >> Normally, the data will be read twice (one from source, and one from
>> disk)
>> >> and write once, but if you spilled too much files, flink will first
>> merge
>> >> some all the files and make sure the last merge step will not exceed
>> some
>> >> limit (default 128). Hope this can help you.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>> ro.schmid...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>> know
>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>> have done
>> >>> so far.
>> >>>
>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster,
>> each
>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>> of disk.
>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>> >>>
>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>> counters
>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
>> 3.2 TiB
>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>> TeraGen, 1
>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>> >>>
>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>> >>> wrapper that logs file system statistics for each call to hdfs://...,
>> such
>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>> 

Weird serialization bug?

2017-04-29 Thread Sebastian Neef
Hello Apache Flink users,

I implemented a FilterFunction some months ago that worked quite well
back then. However, I wanted to check it out right now and it somehow
broke in the sense that Flink can't serialize it anymore. I might be
mistaken, but afaik I didn't touch the code at all.

I think that I've tracked down the problem to the following minimal
working PoC:

- A simple interface:

> interface testFunc extends Serializable {
> boolean filter();
> }

- A TestFilterFunction which is applied on a DataSet:

>  public void doSomeFiltering() {
> class fooo implements testFunc {
> public boolean filter() {
> return false;
> }
> }
> 
> class TestFilterFunction implements FilterFunction {
> 
> testFunc filter;
> 
> class fooo2 implements testFunc {
> public boolean filter() {
> return false;
> }
> }
> 
> TestFilterFunction() {
> // WORKS with fooo2()
> // DOES NOT WORK with fooo()
> this.filter = new fooo2();
> }
> @Override
> public boolean filter(IPage iPage) throws Exception {
> return filter.filter();
> }
> }
>   filteredDataSet = DataSet.filter(new TestFilterFunction(null))> }

Flink will work fine when the "fooo2" class is used. However, when using
the "fooo()" class, I get the following error:

> 
>  The program finished with the following exception:
> 
> The implementation of the FilterFunction is not serializable. The object 
> probably contains or references non serializable fields.
>   org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
>   org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
>   org.apache.flink.api.java.DataSet.filter(DataSet.java:287)
>   testpackage.testclass.applyFilters(testclass.java:105)

I'm a little bit confused, why Flink manages to serialize the "fooo2"
class, but not the "fooo" class. Is this is a bug or do I miss something
here?

Cheers,
Sebastian



CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Moiz S Jinia
When using "next", this pattern works fine for the both a match as well as
a timeout:

Pattern pattern = Pattern.begin("start")
.where(evt -> evt.value.equals("ni"))
.next("last").where(evt ->
evt.value.equals("ar")).within(Time.seconds(5));

1. "ni" then "ar" within 5 seconds - triggers match
2. "ni" then no "ar" within 5 seconds - triggers timeout

But with "followedBy", this does not behave as expected:

Pattern pattern = Pattern.begin("start")
.where(evt -> evt.value.equals("ni"))
.followedBy("last").where(evt ->
evt.value.equals("ar")).within(Time.seconds(5));

"ni" then "ar" within 5 seconds - triggers match and also triggers timeout.

Why is the timeout triggered when using followedBy (when there is a match)?

Version - 1.1.5.


Batch source improvement

2017-04-29 Thread Flavio Pompermaier
Hi to all,
we're still using Flink as a batch processor and despite not very
advertised is still doing great.
However there's one thing I always wanted to ask: when reading data from a
source the job manager computes the splits and assigns a set of them to
every instance of the InputFormat. This works fine until the data is
pefectly balanced but in my experience most of the times this is not true
and some of them completes very quickly while some of them continue to read
data (also for a long time).

Couldn't this be enhanced buffering splits in a shared place so that tasks
could ask for a "free" split as soon as they complete to read their
assigned split? Would it be complicated to implement such a logic?

Best,
Flavio


Collector.collect

2017-04-29 Thread Gaurav Khandelwal
Hello

I am working on RichProcessFunction and I want to emit multiple records at
a time. To achieve this, I am currently doing :

while(condition)
{
   Collector.collect(new Tuple<>...);
}

I was wondering, is this the correct way or there is any other alternative.


Re: Fault tolerance & idempotency on window functions

2017-04-29 Thread Kamil Dziublinski
Big thanks for replying Aljoscha, I spend quite some time on thinking how
to solve this problem and came to some conclusions. Would be cool if you
can verify if my logic is correct.

I decided that if I will partition data in kafka in the same way as I
partition my window with keyby. It's tenant, user combination (I would
still use hash out of it in kafka producer) and I will switch processing to
event time (currently it was processing time) then during replay I could be
100% sure that first element will always be first, and watermark for
triggering the window would also come at the same moment. This giving me
idempotent writes of this batched object to HBase.

And for late events (by configuring lateness on the window itself) I would
configure the trigger to fire & purge, so that it doesn't hold fired data.
This way if late event arrives I could fire this late event with a
different timestamp treating it in hbase as totally separate increment, not
overriding my previous data.
The reason I want to purge data here on firing, is cause I would need to
have allowed lateness on window of at least 2 months. So holding all data
after firing for 2 months would be too costly.
Additional question here, is there any cost to having allowed lateness very
high (like 2 months) if we configure trigger to fire & purge. Like any
additional state or metadata that flinks need to maintain that would take
much memory from the cluster? Would I have to consider rocksdb here for
state or FS state could still work?

On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek 
wrote:

> Hi,
> Yes, your analysis is correct: Flink will not retry for individual
> elements but will restore from the latest consistent checkpoint in case of
> failure. This also means that you can get different window results based on
> which element arrives first, i.e. you have a different timestamp on your
> output in that case.
>
> One simple mitigation for the timestamp problem is to use the largest
> timestamp of elements within a window instead of the first timestamp. This
> will be stable across restores even if the order of arrival of elements
> changes. You can still get problems when it comes to late data and window
> triggering, if you cannot guarantee that your watermark is 100 % correct,
> though. I.e. it might be that, upon restore, an element with an even larger
> timestamp arrives late that was not considered when doing the first
> processing that failed.
>
> Best,
> Aljoscha
> > On 25. Apr 2017, at 19:54, Kamil Dziublinski <
> kamil.dziublin...@gmail.com> wrote:
> >
> > Hi guys,
> >
> > I have a flink streaming job that reads from kafka, creates some
> statistics increments and stores this in hbase (using normal puts).
> > I'm using fold function here of with window of few seconds.
> >
> > My tests showed me that restoring state with window functions is not
> exactly working how I expected.
> > I thought that if my window functions emits an aggregated object to a
> sink, and that object fails in a sink, this write to hbase will be
> replayed. So even if it actually got written to HBase, but flink thought it
> didnt (for instance during network problem) I could be sure of idempotent
> writes. I wanted to enforce that by using the timestamp of the first event
> used in that window for aggregation.
> >
> > Now correct me if I'm wrong but it seems that in the case of failure
> (even if its in sink) whole flow is getting replayed from last checkpoint
> which means that my window function might evict aggregated object in a
> different form. For instance not only having tuples that failed but also
> other ones, which would break my idempotency her and I might end up with
> having higher counters than I should have.
> >
> > Do you have any suggestion on how to solve/workaround such problem in
> flink?
> >
> > Thanks,
> > Kamil.
> >
> >
>
>


Re: Behavior of the cancel command

2017-04-29 Thread Jürgen Thomann

Hi Aljoscha,

In my case the valid-length file created contains a value which e.g. 
says 100 MB are valid to read for exactly once but the file with the 
data is only 95 MB large. As I understand it the valid-length file 
contains the length of the file at the checkpoint. This does also not 
happen for all files (3 HDFS sinks each with a parallelism of 2). For 
some parts the file size and the value in the valid-length file match 
exactly.


After looking now over the checkpoint code in BucketingSink I looked 
into the hsync behavior again and found the following page: 
http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file
After this I downloaded the file with the hdfs dfs tool and actually the 
file is now even larger than the valid-length file. I checked this 
against the things I did before (Impala and hive select count query, and 
Hue download of files and wc -l) and this 3 ways result in the same 
amount of lines but hdfs dfs -cat  | wc -l gives a much larger 
value.


So my conclusion would be that the data is written and not exactly lost 
as I thought, but for my use case not visible because the files are not 
properly closed during cancel and the namenode is not aware of the 
flushed data. So I could imagine 2 ways out of this: 1. implement the 
hsync as stated at the Stack Overflow page or 2. ensure that files are 
properly closed during cancel.


Best,
Jürgen

On 28.04.2017 17:38, Aljoscha Krettek wrote:

Hi Jürgen,
Is there missing data with respect to what should have been written at 
the time of the cancel or when the last checkpoint (or in that case, 
the savepoint) was performed. I’m asking because the cancel command is 
only sent out once the savepoint has been completed, as can be seen at 
[1]. If the savepoint is complete this also means that the snapshot 
method of the BucketingSink must have done it’s work, i.e. that it 
also flushed all files, which is done in [2]. There’s always the 
possibility of a bug, however, so we’ll have to look into this together.


Best,
Aljoscha

[1] 
https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607


[2] 
https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java


On 27. Apr 2017, at 10:27, Jürgen Thomann 
> wrote:


Hi,

I had some time ago problems with writing data to Hadoop with the 
BucketingSink and losing data in case of cancel with savepoint 
because flush/sync command was interrupted. I tried changing Hadoop 
settings as suggested but had no luck at the end and looked into the 
Flink code. If I understand the code correctly it behaves the 
following way:


1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal 
error if timeout is reached


In my case the BucketingSink does not has enough time to flush 
everything before the initial interrupt is sent and some files are 
not closed properly which causes the missing data in Hadoop in my 
understanding.


Is my understanding correct and if yes, do you know a way to get 
around this behavior to let the close function finish the sync for 
all files?


Best,
Jürgen