Re: Java 8 lambdas for CEP patterns won't compile
Thank you! Hoping to see Lambda support added back in soon as well. Regards, David On Mon, Jun 12, 2017 at 1:57 PM, Kostas Kloudaswrote: > Done. > > On Jun 12, 2017, at 12:24 PM, Ted Yu wrote: > > Can you add link to this thread in the JIRA ? > > Cheers > > On Mon, Jun 12, 2017 at 3:15 AM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Unfortunately, there was no discussion as this regression came as an >> artifact of the addition of the IterativeConditions, but it will be fixed. >> >> This is the JIRA to track it: >> https://issues.apache.org/jira/browse/FLINK-6897 >> >> Kostas >> >> On Jun 12, 2017, at 11:51 AM, Ted Yu wrote: >> >> Do know which JIRA / discussion thread had the context for this decision ? >> >> I did a quick search in JIRA but only found FLINK-3681. >> >> Cheers >> >> On Mon, Jun 12, 2017 at 1:48 AM, Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Hi David and Ted, >>> >>> The documentation is outdated. I will update it today. >>> Java8 Lambdas are NOT supported by CEP in Flink 1.3. >>> >>> Hopefully this will change soon. I will open a JIRA for this. >>> >>> Cheers, >>> Kostas >>> >>> On Jun 11, 2017, at 11:55 PM, Ted Yu wrote: >>> >>> >>> >>> >>> >> >> > >
Re: Listening to timed-out patterns in Flink CEP
Hello, It's been a while and I have never replied on the list. In fact, the fix committed by Till does work. Thanks! On Tue, Apr 25, 2017 at 9:37 AM, Moiz Jiniawrote: > Hey David, > Did that work for you? If yes could you share an example. I have a similar > use case - need to get notified of an event NOT occurring within a > specified > time window. > > Thanks much! > > Moiz > > > > -- > View this message in context: http://apache-flink-user-maili > ng-list-archive.2336050.n4.nabble.com/Listening-to-timed- > out-patterns-in-Flink-CEP-tp9371p12800.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: Listening to timed-out patterns in Flink CEP
Hi Till, Excellent - I'll check out the current snapshot version! Thank you for taking the time to look into this. Regards, David On Tue, Nov 8, 2016 at 3:25 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi David, > > sorry for my late reply. I just found time to look into the problem. You > were right with your observation that the CEP operator did not behave as > I've described it. The problem was that the time of the underlying NFA was > not advanced if there were no events buffered in the CEP operator when a > new watermark arrived. This was not intended and I opened a PR [1] to fix > this problem. I've tested the fix with your example program and it seems to > solve the problem that you don't see timeouts after the timeout interval > has passed. Thanks for reporting this problem and please excuse my long > response time. > > Btw, I'll merge the PR this evening. So it should be included in the > current snapshot version by the end of tomorrow. > > [1] https://github.com/apache/flink/pull/2771 > > Cheers, > Till > > On Fri, Oct 14, 2016 at 11:40 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi guys, >> >> I'll try to come up with an example illustrating the behaviour over the >> weekend. >> >> Cheers, >> Till >> >> On Fri, Oct 14, 2016 at 11:16 AM, David Koch <ogd...@googlemail.com> >> wrote: >> >>> Hello, >>> >>> Thanks for the code Sameer. Unfortunately, it didn't solve the issue. >>> Compared to what I did the principle is the same - make sure that the >>> watermark advances even without events present to trigger timeouts in CEP >>> patterns. >>> >>> If Till or anyone else could provide a minimal example illustrating the >>> supposed behaviour of: >>> >>> [CEP] timeout will be detected when the first watermark exceeding the >>>> timeout value is received >>> >>> >>> I'd very much appreciate it. >>> >>> Regards, >>> >>> David >>> >>> >>> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sam...@axiomine.com> wrote: >>> >>>> Try this. Your WM's need to move forward. Also don't use System >>>> Timestamp. Use the timestamp of the element seen as the reference as the >>>> elements are most likely lagging the system timestamp. >>>> >>>> DataStream withTimestampsAndWatermarks = tuples >>>> .assignTimestampsAndWatermarks(new >>>> AssignerWithPeriodicWatermarks() { >>>> >>>> long waterMarkTmst; >>>> long lastEmittedWM=0; >>>> @Override >>>> public long extractTimestamp(Event element, long >>>> previousElementTimestamp) { >>>> if(element.tmst>lastEmittedWM){ >>>>waterMarkTmst = element.tmst-1; //Assumes >>>> increasing timestamps. Need to subtract 1 as more elements with same TS >>>> might arrive >>>> } >>>> return element.tmst; >>>> } >>>> >>>> @Override >>>> public Watermark getCurrentWatermark() { >>>> if(lastEmittedWM==waterMarkTmst){ //No new event seen, >>>> move the WM forward by auto watermark interval >>>> waterMarkTmst = waterMarkTmst + 1000l//Increase by >>>> auto watermark interval (Watermarks only move forward in time) >>>> } >>>> lastEmittedWM = waterMarkTmst >>>> >>>> System.out.println(String.format("Watermark at %s", >>>> new Date(waterMarkTmst))); >>>> return new Watermark(waterMarkTmst);//Until an event >>>> is seem WM==0 starts advancing by 1000ms until an event is seen >>>> } >>>> }).keyBy("key"); >>>> >>>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <ogd...@googlemail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I tried setting the watermark to System.currentTimeMillis() - 5000L, >>>>> event timestamps are System.currentTimeMillis(). I do not observe the >>>>> expected behaviour of the PatternTimeoutFunction firing once the watermark >>>>> moves past the timeout "anchored" by a pattern match. >>>>> >>>>> Here
Re: Listening to timed-out patterns in Flink CEP
Hello, I tried setting the watermark to System.currentTimeMillis() - 5000L, event timestamps are System.currentTimeMillis(). I do not observe the expected behaviour of the PatternTimeoutFunction firing once the watermark moves past the timeout "anchored" by a pattern match. Here is the complete test class source <http://pastebin.com/9WxGq2wv>, in case someone is interested. The timestamp/watermark assigner looks like this: DataStream withTimestampsAndWatermarks = tuples .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { long waterMarkTmst; @Override public long extractTimestamp(Event element, long previousElementTimestamp) { return element.tmst; } @Override public Watermark getCurrentWatermark() { waterMarkTmst = System.currentTimeMillis() - 5000L; System.out.println(String.format("Watermark at %s", new Date(waterMarkTmst))); return new Watermark(waterMarkTmst); } }).keyBy("key"); withTimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInterval(1000L); // Apply pattern filtering on stream. PatternStream patternStream = CEP.pattern(withTimestampsAndWatermarks, pattern); Any idea what's wrong? David On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> wrote: > Assuming an element with timestamp which is later than the last emitted > watermark arrives, would it just be dropped because the PatternStream does > not have a max allowed lateness method? In that case it appears that CEP > cannot handle late events yet out of the box. > > If we do want to support late events can we chain a keyBy().timeWindow(). > allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() again > before handing it to the CEP operator. This way we may have the patterns > fired multiple times but it allows an event to be late and out of order. It > looks like it will work but is there a less convoluted way. > > Thanks, > Sameer > > On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <till.rohrm...@gmail.com> > wrote: > >> But then no element later than the last emitted watermark must be issued >> by the sources. If that is the case, then this solution should work. >> >> Cheers, >> Till >> >> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote: >> >>> Hi, >>> >>> If you know that the events are arriving in order and a consistent lag, >>> why not just increment the watermark time every time the >>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval >>> (or less to be conservative). >>> >>> You can check if the watermark has changed since the arrival of the last >>> event and if not increment it in the getCurrentWatermark() method. >>> Otherwise the watermark will never increase until an element arrive and if >>> the stream partition stalls for some reason the whole pipeline freezes. >>> >>> Sameer >>> >>> >>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrm...@gmail.com> >>> wrote: >>> >>>> Hi David, >>>> >>>> the problem is still that there is no corresponding watermark saying >>>> that 4 seconds have now passed. With your code, watermarks will be >>>> periodically emitted but the same watermark will be emitted until a new >>>> element arrives which will reset the watermark. Thus, the system can never >>>> know until this watermark is seen whether there will be an earlier event or >>>> not. I fear that this is a fundamental problem with stream processing. >>>> >>>> You're right that the negation operator won't solve the problem. It >>>> will indeed suffer from the same problem. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote: >>>> >>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>>> "not" operator) does not address this because again, how would the "not >>>>> match" be triggered if no event at all occurs? >>>>> >>>>> Good question. >>>>> >>>>> I'm not sure whether the following will work: >>>>> >>>>> This could be done by creating a CEP matching pattern that uses both >>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something like >>>>>
Re: Listening to timed-out patterns in Flink CEP
I will give it a try, my current time/watermark assigner extends AscendingTimestampExtractor so I can't override setting the watermark to the last seen event timestamp. Thanks for your replies. /David On Tue, Oct 11, 2016 at 6:17 PM, Till Rohrmann <till.rohrm...@gmail.com> wrote: > But then no element later than the last emitted watermark must be issued > by the sources. If that is the case, then this solution should work. > > Cheers, > Till > > On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote: > >> Hi, >> >> If you know that the events are arriving in order and a consistent lag, >> why not just increment the watermark time every time the >> getCurrentWatermark() method is invoked based on the autoWatermarkInterval >> (or less to be conservative). >> >> You can check if the watermark has changed since the arrival of the last >> event and if not increment it in the getCurrentWatermark() method. >> Otherwise the watermark will never increase until an element arrive and if >> the stream partition stalls for some reason the whole pipeline freezes. >> >> Sameer >> >> >> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrm...@gmail.com> >> wrote: >> >>> Hi David, >>> >>> the problem is still that there is no corresponding watermark saying >>> that 4 seconds have now passed. With your code, watermarks will be >>> periodically emitted but the same watermark will be emitted until a new >>> element arrives which will reset the watermark. Thus, the system can never >>> know until this watermark is seen whether there will be an earlier event or >>> not. I fear that this is a fundamental problem with stream processing. >>> >>> You're right that the negation operator won't solve the problem. It will >>> indeed suffer from the same problem. >>> >>> Cheers, >>> Till >>> >>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote: >>> >>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>> "not" operator) does not address this because again, how would the "not >>>> match" be triggered if no event at all occurs? >>>> >>>> Good question. >>>> >>>> I'm not sure whether the following will work: >>>> >>>> This could be done by creating a CEP matching pattern that uses both of >>>> "notNext" (or "notFollowedBy") and "within" constructs. Something like >>>> this: >>>> >>>> Pattern<Event, ?> pattern = Pattern.begin("first") >>>> .notNext("second") >>>> .within(Time.seconds(3)); >>>> >>>> I'm hoping Flink CEP experts (Till?) will comment on this. >>>> >>>> Note: I have requested these negation patterns to be implemented in >>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink.. >>>> >>>> >>>> - LF >>>> >>>> >>>> >>>> >>>> -- >>>> *From:* David Koch <ogd...@googlemail.com> >>>> *To:* user@flink.apache.org; lg...@yahoo.com >>>> *Sent:* Sunday, October 9, 2016 5:51 AM >>>> >>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>> >>>> Hello, >>>> >>>> Thank you for the explanation as well as the link to the other post. >>>> Interesting to learn about some of the open JIRAs. >>>> >>>> Indeed, I was not using event time, but processing time. However, even >>>> when using event time I only get notified of timeouts upon subsequent >>>> events. >>>> >>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I >>>> read from a socket, wrap this in a custom "event" with >>>> timestamp, key the resultant stream by and attempt to detect >>>> instances no further than 3 seconds apart using CEP. >>>> >>>> Apart from the fact that results are only printed when I close the >>>> socket (normal?) I don't observe any change in behaviour >>>> >>>> So event-time/watermarks or not: SOME event has to occur for the >>>> timeout to be triggered. >>>> >>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>> "not"
Re: Listening to timed-out patterns in Flink CEP
Hello, Thank you for the explanation as well as the link to the other post. Interesting to learn about some of the open JIRAs. Indeed, I was not using event time, but processing time. However, even when using event time I only get notified of timeouts upon subsequent events. The link <http://pastebin.com/x4m3RHQz> contains an example where I read from a socket, wrap this in a custom "event" with timestamp, key the resultant stream by and attempt to detect instances no further than 3 seconds apart using CEP. Apart from the fact that results are only printed when I close the socket (normal?) I don't observe any change in behaviour So event-time/watermarks or not: SOME event has to occur for the timeout to be triggered. FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP "not" operator) does not address this because again, how would the "not match" be triggered if no event at all occurs? On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote: > The following is a better link: > > http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z% > 3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E > > > - LF > > > > > -- > *From:* "lg...@yahoo.com" <lg...@yahoo.com> > *To:* "user@flink.apache.org" <user@flink.apache.org> > *Sent:* Friday, October 7, 2016 3:36 PM > > *Subject:* Re: Listening to timed-out patterns in Flink CEP > > Isn't the upcoming CEP negation (absence of an event) feature solve this > issue? > > See this discussion thread: > http://mail-archives.apache.org/mod_mbox/flink-user/ > 201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX > 9Fg%40mail.gmail.com%3E > > > > // Atul > > > -- > *From:* Till Rohrmann <trohrm...@apache.org> > *To:* user@flink.apache.org > *Sent:* Friday, October 7, 2016 12:58 AM > *Subject:* Re: Listening to timed-out patterns in Flink CEP > > Hi David, > > in case of event time, the timeout will be detected when the first > watermark exceeding the timeout value is received. Thus, it depends a > little bit how you generate watermarks (e.g. periodically, watermark per > event). > > In case of processing time, the time is only updated whenever a new > element arrives. Thus, if you have an element arriving 4 seconds after > Event A, it should detect the timeout. If the next event arrives 20 seconds > later, than you won't see the timeout until then. > > In the case of processing time, we could think about registering timeout > timers for processing time. However, I would highly recommend you to use > event time, because with processing time, Flink cannot guarantee meaningful > computations, because the events might arrive out of order. > > Cheers, > Till > > On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com> wrote: > > Hello, > > With Flink CEP, is there a way to actively listen to pattern matches that > time out? I am under the impression that this is not possible. > > In my case I partition a stream containing user web navigation by "userId" > to look for sequences of Event A, followed by B within 4 seconds for each > user. > > I registered a PatternTimeoutFunction which assuming a non-match only > fires upon the first event after the specified timeout. For example, given > user X: Event A, 20 seconds later Event B (or any other type of event). > > I'd rather have a notification fire directly upon the 4 second interval > expiring since passive invalidation is not really applicable in my case. > > How, if at all can this be achieved with Flink CEP? > > Thanks, > > David > > > > > > >